Building a Connector
This guide walks through creating a new connector for GTM Clarity. The framework provides a standard interface, an example scaffold, and patterns from five production connectors to follow.
Overview
Every connector must:
- Implement the
Connectorinterface fromsrc/connectors/interface.ts - Self-register in the
ConnectorRegistry - Transform source data into canonical schemas (
CanonicalPerson,CanonicalAccount, orCanonicalActivity) - Encrypt credentials at rest using AES-256-GCM
- Provide default field mappings and a health check
Step 1: Copy the Example Directory
Start from the __example__ scaffold:
cp -r src/connectors/__example__ src/connectors/your-connector
This gives you two files:
| File | Purpose |
|---|---|
connector.ts | Main connector class with all interface methods stubbed |
schema.ts | Default field mappings |
Step 2: Implement Authentication
Create an auth.ts file for credential management. Follow the AES-256-GCM pattern used by all connectors:
// src/connectors/your-connector/auth.ts
import { randomBytes, createCipheriv, createDecipheriv } from "crypto";
const ALGORITHM = "aes-256-gcm";
const IV_LENGTH = 12;
const TAG_LENGTH = 16;
function getEnv(name: string): string {
const value = process.env[name];
if (!value) throw new Error(`Missing env var: ${name}`);
return value;
}
export function encryptApiKey(apiKey: string): string {
const key = Buffer.from(getEnv("CONNECTOR_ENCRYPTION_KEY"), "hex");
if (key.length !== 32) {
throw new Error("CONNECTOR_ENCRYPTION_KEY must be 64 hex chars");
}
const iv = randomBytes(IV_LENGTH);
const cipher = createCipheriv(ALGORITHM, key, iv);
const encrypted = Buffer.concat([
cipher.update(apiKey, "utf8"),
cipher.final(),
]);
const tag = cipher.getAuthTag();
return Buffer.concat([iv, tag, encrypted]).toString("base64");
}
export function decryptApiKey(encrypted: string): string {
const key = Buffer.from(getEnv("CONNECTOR_ENCRYPTION_KEY"), "hex");
if (key.length !== 32) {
throw new Error("CONNECTOR_ENCRYPTION_KEY must be 64 hex chars");
}
const combined = Buffer.from(encrypted, "base64");
const iv = combined.subarray(0, IV_LENGTH);
const tag = combined.subarray(IV_LENGTH, IV_LENGTH + TAG_LENGTH);
const ciphertext = combined.subarray(IV_LENGTH + TAG_LENGTH);
const decipher = createDecipheriv(ALGORITHM, key, iv);
decipher.setAuthTag(tag);
return Buffer.concat([
decipher.update(ciphertext),
decipher.final(),
]).toString("utf8");
}
For OAuth-based sources (like Salesforce or O365), you will need HMAC-signed state parameters, token exchange, and refresh logic. See src/connectors/salesforce/oauth.ts or src/connectors/o365/oauth.ts for complete examples.
Step 3: Implement the Connector Class
Update connector.ts with your implementation. Here is the full interface to satisfy:
import type {
Connector,
ConnectorConfig,
ConnectorHealthStatus,
FieldMapping,
SyncResult,
CanonicalPerson,
CanonicalAccount,
CanonicalActivity,
} from "../interface";
import { yourConnectorFieldMappings } from "./schema";
import { decryptApiKey } from "./auth";
import { connectorRegistry } from "../registry";
export class YourConnector implements Connector {
readonly id = "your-connector"; // Unique registry ID
readonly name = "Your Connector"; // Human-readable name
readonly version = "1.0.0";
async authenticate(
config: ConnectorConfig,
): Promise<{ credentials: Record<string, string>; expiresAt?: Date }> {
// 1. Decrypt stored credentials
const apiKey = decryptApiKey(config.settings.encryptedApiKey);
// 2. Validate against the external API
const response = await fetch("https://api.example.com/health", {
headers: { Authorization: `Bearer ${apiKey}` },
});
if (!response.ok) {
throw new Error(`Auth failed: ${response.status}`);
}
// 3. Return credentials (no expiresAt for non-expiring keys)
return { credentials: { apiKey } };
}
async sync(
credentials: Record<string, string>,
lastSyncCursor: string | null,
_options?: { fullSync?: boolean },
): Promise<
SyncResult<CanonicalPerson | CanonicalAccount | CanonicalActivity>
> {
// Implement your sync logic here
// See Step 4 for the sync module pattern
return {
records: [],
cursor: null,
hasMore: false,
syncedAt: new Date(),
stats: { created: 0, updated: 0, errors: 0 },
};
}
getSchema(): FieldMapping[] {
return yourConnectorFieldMappings;
}
async healthCheck(
credentials: Record<string, string>,
): Promise<ConnectorHealthStatus> {
try {
const response = await fetch("https://api.example.com/health", {
headers: { Authorization: `Bearer ${credentials.apiKey}` },
});
return {
healthy: response.ok,
lastSync: null,
error: response.ok ? undefined : `HTTP ${response.status}`,
};
} catch (err) {
return {
healthy: false,
lastSync: null,
error: err instanceof Error ? err.message : String(err),
};
}
}
}
// Self-register -- this line is REQUIRED
connectorRegistry.register(new YourConnector());
The connectorRegistry.register() call at the end of the file is mandatory. Without it, the connector will not appear in connectorRegistry.list() or be discoverable at runtime.
Step 4: Add Sync Logic
Create a sync/ subdirectory with pull.ts and transform.ts files.
pull.ts -- Fetch from External API
Follow the async generator pattern used by all connectors:
// src/connectors/your-connector/sync/pull.ts
export interface YourApiEvent {
id: string;
timestamp: string;
// ... source-specific fields
}
const PAGE_SIZE = 100;
const DEFAULT_BACKFILL_DAYS = 90;
export async function* pullEvents(
apiKey: string,
since: Date | null,
): AsyncGenerator<YourApiEvent[]> {
const sinceIso = since
? since.toISOString()
: new Date(Date.now() - DEFAULT_BACKFILL_DAYS * 86400000).toISOString();
let nextUrl: string | null =
`https://api.example.com/v1/events?since=${sinceIso}&limit=${PAGE_SIZE}`;
while (nextUrl) {
const response = await fetch(nextUrl, {
headers: { Authorization: `Bearer ${apiKey}` },
});
if (!response.ok) {
throw new Error(`API failed: ${response.status}`);
}
const data = await response.json();
if (data.events?.length > 0) {
yield data.events;
}
nextUrl = data.next ?? null;
}
}
If your source API has rate limits, add pacing (delay between requests) and 429 handling with exponential backoff. See src/connectors/fathom/sync/pull.ts for a robust example with Retry-After header support.
transform.ts -- Map to Canonical Schema
Transform functions must be pure -- no side effects, no API calls, no database access:
// src/connectors/your-connector/sync/transform.ts
import type { CanonicalActivity } from "@/connectors/interface";
import type { YourApiEvent } from "./pull";
const EVENT_TYPE_MAP: Record<string, { type: string; channel: string }> = {
page_view: { type: "page_view", channel: "web" },
form_submit: { type: "form_submit", channel: "web" },
};
export function transformEvent(
event: YourApiEvent,
): CanonicalActivity | null {
const mapping = EVENT_TYPE_MAP[event.type];
if (!mapping) return null; // Skip unmapped types
return {
externalId: event.id,
type: mapping.type,
channel: mapping.channel,
occurredAt: new Date(event.timestamp),
personExternalId: event.userId,
properties: {
source: "your-connector",
// ... additional metadata
},
};
}
Step 5: Define Field Mappings
Update schema.ts with your default field mappings:
// src/connectors/your-connector/schema.ts
import type { FieldMapping } from "../interface";
export const yourConnectorFieldMappings: FieldMapping[] = [
{
sourceField: "user_id",
targetField: "personExternalId",
transform: "none",
required: true,
},
{
sourceField: "email",
targetField: "email",
transform: "lowercase",
required: false,
},
{
sourceField: "event_type",
targetField: "type",
transform: "none",
required: true,
},
{
sourceField: "timestamp",
targetField: "occurredAt",
transform: "none",
required: true,
},
];
Available transforms: none, lowercase, uppercase, trim, custom.
Step 6: Add API Routes
Create OAuth or webhook routes in the Next.js API directory:
src/app/api/connectors/your-connector/
callback/route.ts # OAuth callback (if applicable)
webhook/route.ts # Webhook endpoint (if applicable)
For webhook-based connectors, implement signature verification. See the Customer.io webhook handler for the HMAC-SHA256 pattern:
import { createHmac, timingSafeEqual } from "crypto";
export function verifySignature(
body: Buffer,
signature: string,
secret: string,
): boolean {
const expected = createHmac("sha256", secret)
.update(body)
.digest("hex");
if (signature.length !== expected.length) return false;
return timingSafeEqual(
Buffer.from(signature, "utf8"),
Buffer.from(expected, "utf8"),
);
}
Step 7: Add Temporal Activities (Optional)
If your connector needs scheduled syncs or long-running operations, add Temporal activities:
// src/connectors/temporal/your-connector-activities.ts
import { YourConnector } from "../your-connector/connector";
export async function syncYourConnector(
tenantId: string,
connectorId: string,
): Promise<void> {
const connector = new YourConnector();
// ... authenticate, sync, persist
}
Register the activities in src/connectors/temporal/worker.ts.
Step 8: Write Tests
Create tests in the tests/ directory:
tests/connectors/your-connector/
auth.test.ts # Encryption/decryption
connector.test.ts # authenticate, sync, healthCheck
transform.test.ts # Pure transform functions
pull.test.ts # API pull with mocked fetch
The project follows TDD London School (mock-first). Mock external API calls using vi.fn() and test transforms as pure functions with known inputs and expected outputs.
Checklist
Before submitting your connector, verify:
- Implements all 4
Connectorinterface methods (authenticate,sync,getSchema,healthCheck) - Self-registers via
connectorRegistry.register() - Credentials encrypted at rest with AES-256-GCM
- No raw tokens or keys in log output
- All operations scoped to
tenantId - Default field mappings defined in
schema.ts - Transform functions are pure (no side effects)
- Rate limiting implemented for external API calls
- Zod validation on API responses (recommended)
- Tests written for auth, transform, and sync logic
-
npm testpasses -
npm run type-checkpasses
Reference Connectors
| Connector | Best Example Of |
|---|---|
| Salesforce | OAuth flow, bidirectional sync, quality pipeline, writeback |
| Customer.io | Dual-mode ingestion (webhook + API pull), HMAC verification |
| Delivr.ai | Simple API key auth, multiple data streams, buying group integration |
| Microsoft 365 | MSAL OAuth, privacy-first architecture, DPA gating, delta queries |
| Fathom.video | Rate limiting with backoff, Zod-validated responses, dual storage model |