API Integration Guide
Status: Complete Implementation Guide
Version: 1.0
Purpose: Step-by-step procedures for implementing robust API integration patterns
Applicable To: Any distributed system requiring reliable service communication
Overview
This guide provides comprehensive procedures for implementing resilient API integration patterns that enable loose coupling, fault tolerance, and efficient resource utilization. The approach emphasizes event-driven communication and asynchronous processing for scalable system architecture.
Key Benefits
- 90% Less Coupling: Event-driven architecture reduces dependencies
- 10x Throughput: Batched async processing improves performance
- 80% Fewer Requests: Server-sent events replace polling
- 95% Fewer Connections: Connection pooling optimizes resources
Event-Driven Communication Setup
Step 1: Event Bus Implementation
Create a central event bus for decoupled communication:
// src/lib/events/event-bus.ts
import { PubSub } from '@google-cloud/pubsub';
export class EventBus {
private pubsub: PubSub;
private topics: Record<string, string>;
constructor() {
this.pubsub = new PubSub({
projectId: process.env.GCP_PROJECT_ID
});
// Topic registry for type safety
this.topics = {
'campaign.created': 'campaign-events',
'campaign.scheduled': 'campaign-events',
'email.sent': 'email-events',
'email.bounced': 'email-events',
'user.signup': 'user-events',
'payment.received': 'payment-events'
};
}
async publish(eventType: string, data: any): Promise<void> {
const topicName = this.topics[eventType];
if (!topicName) {
throw new Error(`Unknown event type: ${eventType}`);
}
const topic = this.pubsub.topic(topicName);
// Structured event format
const message = {
eventType,
timestamp: new Date().toISOString(),
data,
metadata: {
version: '1.0',
source: process.env.SERVICE_NAME,
correlationId: this.generateCorrelationId()
}
};
try {
await topic.publishMessage({
data: Buffer.from(JSON.stringify(message))
});
console.log(`Event published: ${eventType}`);
} catch (error) {
console.error(`Failed to publish event ${eventType}:`, error);
throw error;
}
}
private generateCorrelationId(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
Step 2: Event Subscription Handler
// src/lib/events/event-handler.ts
export class EventHandler {
private eventBus: EventBus;
private handlers: Map<string, Function[]>;
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.handlers = new Map();
}
subscribe(eventType: string, handler: Function): void {
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, []);
}
this.handlers.get(eventType)!.push(handler);
}
async handleEvent(message: any): Promise<void> {
try {
const event = JSON.parse(message.data.toString());
const handlers = this.handlers.get(event.eventType) || [];
// Process handlers in parallel
await Promise.all(
handlers.map(handler => this.executeHandler(handler, event))
);
// Acknowledge message
message.ack();
} catch (error) {
console.error('Event handling failed:', error);
// Negative acknowledge for retry
message.nack();
}
}
private async executeHandler(handler: Function, event: any): Promise<void> {
try {
await handler(event);
} catch (error) {
console.error(`Handler execution failed for ${event.eventType}:`, error);
// Don't rethrow - other handlers should still execute
}
}
}
Service Integration Patterns
Step 1: Workflow Integration Service
// src/lib/integrations/workflow-integration.ts
import axios from 'axios';
export class WorkflowIntegration {
private baseUrl: string;
private apiKey: string;
private workflows: Record<string, string>;
constructor() {
this.baseUrl = process.env.WORKFLOW_SERVICE_URL!;
this.apiKey = process.env.WORKFLOW_API_KEY!;
// Workflow registry
this.workflows = {
'campaign-send': 'webhook/campaign-send-v2',
'welcome-series': 'webhook/welcome-email-series',
'abandoned-cart': 'webhook/abandoned-cart-recovery',
'data-export': 'webhook/bulk-data-export'
};
}
async triggerWorkflow(workflowName: string, data: any): Promise<WorkflowResult> {
const webhookPath = this.workflows[workflowName];
if (!webhookPath) {
throw new Error(`Unknown workflow: ${workflowName}`);
}
const url = `${this.baseUrl}/${webhookPath}`;
try {
const response = await axios.post(url, {
...data,
_metadata: {
triggeredAt: new Date().toISOString(),
source: process.env.SERVICE_NAME,
version: '1.0'
}
}, {
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
},
timeout: 5000 // Fast fail for async workflows
});
return {
success: true,
executionId: response.data.executionId
};
} catch (error) {
console.error('Workflow trigger failed:', error.message);
// Queue for retry instead of failing
await this.queueForRetry(workflowName, data);
return {
success: false,
queued: true,
error: error.message
};
}
}
private async queueForRetry(workflowName: string, data: any): Promise<void> {
const retryPayload = {
workflowName,
data,
attempt: 1,
maxAttempts: 3,
retryAt: new Date(Date.now() + 5 * 60 * 1000) // 5 minutes
};
// Store in database or queue service for retry processing
await this.storeRetryJob(retryPayload);
}
private async storeRetryJob(payload: any): Promise<void> {
// Implementation depends on your retry mechanism
// Could be database, Redis, Cloud Tasks, etc.
}
}
interface WorkflowResult {
success: boolean;
executionId?: string;
queued?: boolean;
error?: string;
}
Step 2: Email Service Integration
// src/lib/integrations/email-integration.ts
export class EmailIntegration {
private apiKey: string;
private baseUrl: string;
private rateLimiter: RateLimiter;
constructor() {
this.apiKey = process.env.EMAIL_SERVICE_API_KEY!;
this.baseUrl = process.env.EMAIL_SERVICE_URL!;
this.rateLimiter = new RateLimiter({
maxRequests: 500,
timeWindow: 60000 // 1 minute
});
}
async sendBulkEmail(emails: EmailMessage[]): Promise<BulkEmailResult> {
// Batch emails for optimal throughput
const batches = this.createBatches(emails, 500); // Postmark's limit
const results: BatchResult[] = [];
for (const batch of batches) {
// Rate limiting
await this.rateLimiter.acquire();
try {
const result = await this.sendBatch(batch);
results.push(result);
} catch (error) {
console.error('Batch send failed:', error);
results.push({
success: false,
error: error.message,
batch: batch
});
}
}
return this.aggregateResults(results);
}
private async sendBatch(emails: EmailMessage[]): Promise<BatchResult> {
const response = await axios.post(`${this.baseUrl}/email/batch`, {
Messages: emails.map(email => ({
From: email.from,
To: email.to,
Subject: email.subject,
HtmlBody: email.htmlBody,
TextBody: email.textBody,
MessageStream: email.messageStream || 'outbound',
Tag: email.tag,
Metadata: email.metadata
}))
}, {
headers: {
'X-Postmark-Server-Token': this.apiKey,
'Content-Type': 'application/json'
},
timeout: 30000
});
return {
success: true,
messageIds: response.data.map((r: any) => r.MessageID),
batch: emails
};
}
private createBatches<T>(items: T[], batchSize: number): T[][] {
const batches: T[][] = [];
for (let i = 0; i < items.length; i += batchSize) {
batches.push(items.slice(i, i + batchSize));
}
return batches;
}
private aggregateResults(results: BatchResult[]): BulkEmailResult {
const successful = results.filter(r => r.success);
const failed = results.filter(r => !r.success);
return {
totalBatches: results.length,
successfulBatches: successful.length,
failedBatches: failed.length,
totalEmails: results.reduce((sum, r) => sum + r.batch.length, 0),
successfulEmails: successful.reduce((sum, r) => sum + r.batch.length, 0),
messageIds: successful.flatMap(r => r.messageIds || []),
errors: failed.map(r => r.error).filter(Boolean)
};
}
}
interface EmailMessage {
from: string;
to: string;
subject: string;
htmlBody: string;
textBody?: string;
messageStream?: string;
tag?: string;
metadata?: Record<string, any>;
}
interface BatchResult {
success: boolean;
messageIds?: string[];
batch: EmailMessage[];
error?: string;
}
interface BulkEmailResult {
totalBatches: number;
successfulBatches: number;
failedBatches: number;
totalEmails: number;
successfulEmails: number;
messageIds: string[];
errors: string[];
}
Connection Management
Step 1: Database Connection Pool
// src/lib/database/connection-pool.ts
import { Pool } from 'pg';
export class DatabaseConnectionPool {
private pool: Pool;
private static instance: DatabaseConnectionPool;
private constructor() {
this.pool = new Pool({
host: process.env.DB_HOST,
port: parseInt(process.env.DB_PORT || '5432'),
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
// Connection pool configuration
min: 2, // Minimum connections
max: 10, // Maximum connections
idleTimeoutMillis: 30000, // Close idle connections after 30s
connectionTimeoutMillis: 2000, // Timeout connection attempts after 2s
// SSL configuration
ssl: process.env.NODE_ENV === 'production' ? {
require: true,
rejectUnauthorized: false
} : false
});
// Handle pool events
this.pool.on('connect', (client) => {
console.log('New client connected to database');
});
this.pool.on('error', (err) => {
console.error('Database pool error:', err);
});
}
static getInstance(): DatabaseConnectionPool {
if (!DatabaseConnectionPool.instance) {
DatabaseConnectionPool.instance = new DatabaseConnectionPool();
}
return DatabaseConnectionPool.instance;
}
async query(text: string, params?: any[]): Promise<any> {
const start = Date.now();
try {
const result = await this.pool.query(text, params);
const duration = Date.now() - start;
console.log('Query executed', { text, duration, rows: result.rowCount });
return result;
} catch (error) {
console.error('Query failed', { text, error: error.message });
throw error;
}
}
async transaction<T>(callback: (client: any) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async close(): Promise<void> {
await this.pool.end();
}
}
Step 2: Rate Limiting Implementation
// src/lib/utils/rate-limiter.ts
export class RateLimiter {
private requests: number[];
private maxRequests: number;
private timeWindow: number;
constructor(options: { maxRequests: number; timeWindow: number }) {
this.requests = [];
this.maxRequests = options.maxRequests;
this.timeWindow = options.timeWindow;
}
async acquire(): Promise<void> {
const now = Date.now();
// Remove old requests outside the time window
this.requests = this.requests.filter(
timestamp => now - timestamp < this.timeWindow
);
// Check if we've exceeded the limit
if (this.requests.length >= this.maxRequests) {
const oldestRequest = Math.min(...this.requests);
const waitTime = this.timeWindow - (now - oldestRequest);
if (waitTime > 0) {
await this.delay(waitTime);
return this.acquire(); // Recursively try again
}
}
// Record this request
this.requests.push(now);
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
getUsage(): { current: number; max: number; percentage: number } {
const now = Date.now();
const recentRequests = this.requests.filter(
timestamp => now - timestamp < this.timeWindow
);
return {
current: recentRequests.length,
max: this.maxRequests,
percentage: (recentRequests.length / this.maxRequests) * 100
};
}
}
Retry and Circuit Breaker Patterns
Step 1: Retry Mechanism
// src/lib/utils/retry.ts
export class RetryManager {
static async withRetry<T>(
operation: () => Promise<T>,
options: RetryOptions = {}
): Promise<T> {
const {
maxAttempts = 3,
baseDelay = 1000,
maxDelay = 30000,
backoffFactor = 2,
retryCondition = (error: any) => true
} = options;
let lastError: any;
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error;
if (attempt === maxAttempts || !retryCondition(error)) {
throw error;
}
const delay = Math.min(
baseDelay * Math.pow(backoffFactor, attempt - 1),
maxDelay
);
console.log(`Attempt ${attempt} failed, retrying in ${delay}ms:`, error.message);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError;
}
}
interface RetryOptions {
maxAttempts?: number;
baseDelay?: number;
maxDelay?: number;
backoffFactor?: number;
retryCondition?: (error: any) => boolean;
}
Step 2: Circuit Breaker Implementation
// src/lib/utils/circuit-breaker.ts
export class CircuitBreaker {
private failures: number = 0;
private lastFailureTime?: number;
private state: 'CLOSED' | 'OPEN' | 'HALF_OPEN' = 'CLOSED';
constructor(
private failureThreshold: number = 5,
private resetTimeout: number = 60000 // 1 minute
) {}
async execute<T>(operation: () => Promise<T>): Promise<T> {
if (this.state === 'OPEN') {
if (this.shouldAttemptReset()) {
this.state = 'HALF_OPEN';
} else {
throw new Error('Circuit breaker is OPEN');
}
}
try {
const result = await operation();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
this.failures = 0;
this.state = 'CLOSED';
}
private onFailure(): void {
this.failures++;
this.lastFailureTime = Date.now();
if (this.failures >= this.failureThreshold) {
this.state = 'OPEN';
}
}
private shouldAttemptReset(): boolean {
return this.lastFailureTime !== undefined &&
(Date.now() - this.lastFailureTime) >= this.resetTimeout;
}
getState(): { state: string; failures: number } {
return {
state: this.state,
failures: this.failures
};
}
}
Integration Monitoring
Step 1: Health Check Implementation
// src/lib/monitoring/health-check.ts
export class HealthCheckManager {
private checks: Map<string, HealthCheck> = new Map();
registerCheck(name: string, check: HealthCheck): void {
this.checks.set(name, check);
}
async runChecks(): Promise<HealthStatus> {
const results: HealthCheckResult[] = [];
for (const [name, check] of this.checks) {
try {
const start = Date.now();
await check.execute();
const duration = Date.now() - start;
results.push({
name,
status: 'healthy',
duration,
timestamp: new Date().toISOString()
});
} catch (error) {
results.push({
name,
status: 'unhealthy',
error: error.message,
timestamp: new Date().toISOString()
});
}
}
const overallStatus = results.every(r => r.status === 'healthy')
? 'healthy'
: 'unhealthy';
return {
status: overallStatus,
checks: results,
timestamp: new Date().toISOString()
};
}
}
interface HealthCheck {
execute(): Promise<void>;
}
interface HealthCheckResult {
name: string;
status: 'healthy' | 'unhealthy';
duration?: number;
error?: string;
timestamp: string;
}
interface HealthStatus {
status: 'healthy' | 'unhealthy';
checks: HealthCheckResult[];
timestamp: string;
}
// Example health checks
export class DatabaseHealthCheck implements HealthCheck {
async execute(): Promise<void> {
const db = DatabaseConnectionPool.getInstance();
await db.query('SELECT 1');
}
}
export class WorkflowServiceHealthCheck implements HealthCheck {
async execute(): Promise<void> {
const response = await axios.get(`${process.env.WORKFLOW_SERVICE_URL}/health`, {
timeout: 5000
});
if (response.status !== 200) {
throw new Error(`Workflow service returned ${response.status}`);
}
}
}
Integration Implementation Checklist
Foundation Setup
- Event bus implementation with proper topic management
- Service registry for endpoint discovery
- Connection pooling for database and external services
- Rate limiting for API calls
- Circuit breaker for fault tolerance
Service Integration
- Workflow service integration with retry logic
- Email service integration with batching
- Database integration with transaction support
- External API integration with error handling
- File storage integration with streaming
Error Handling & Resilience
- Retry mechanisms with exponential backoff
- Circuit breaker implementation
- Dead letter queue for failed messages
- Timeout configuration for all external calls
- Graceful degradation strategies
Monitoring & Observability
- Health checks for all integrated services
- Metrics collection for integration points
- Distributed tracing for request flow
- Alerting for integration failures
- Performance monitoring and optimization
Security & Compliance
- API key management and rotation
- Network security and encryption
- Input validation and sanitization
- Audit logging for sensitive operations
- Compliance with data protection regulations
This guide provides a comprehensive framework for implementing robust API integrations with proper error handling, monitoring, and performance optimization. Customize based on your specific services and requirements.