Skip to main content

Building a Connector

This guide walks through creating a new connector for GTM Clarity. The framework provides a standard interface, an example scaffold, and patterns from five production connectors to follow.

Overview

Every connector must:

  1. Implement the Connector interface from src/connectors/interface.ts
  2. Self-register in the ConnectorRegistry
  3. Transform source data into canonical schemas (CanonicalPerson, CanonicalAccount, or CanonicalActivity)
  4. Encrypt credentials at rest using AES-256-GCM
  5. Provide default field mappings and a health check

Step 1: Copy the Example Directory

Start from the __example__ scaffold:

cp -r src/connectors/__example__ src/connectors/your-connector

This gives you two files:

FilePurpose
connector.tsMain connector class with all interface methods stubbed
schema.tsDefault field mappings

Step 2: Implement Authentication

Create an auth.ts file for credential management. Follow the AES-256-GCM pattern used by all connectors:

// src/connectors/your-connector/auth.ts
import { randomBytes, createCipheriv, createDecipheriv } from "crypto";

const ALGORITHM = "aes-256-gcm";
const IV_LENGTH = 12;
const TAG_LENGTH = 16;

function getEnv(name: string): string {
const value = process.env[name];
if (!value) throw new Error(`Missing env var: ${name}`);
return value;
}

export function encryptApiKey(apiKey: string): string {
const key = Buffer.from(getEnv("CONNECTOR_ENCRYPTION_KEY"), "hex");
if (key.length !== 32) {
throw new Error("CONNECTOR_ENCRYPTION_KEY must be 64 hex chars");
}
const iv = randomBytes(IV_LENGTH);
const cipher = createCipheriv(ALGORITHM, key, iv);
const encrypted = Buffer.concat([
cipher.update(apiKey, "utf8"),
cipher.final(),
]);
const tag = cipher.getAuthTag();
return Buffer.concat([iv, tag, encrypted]).toString("base64");
}

export function decryptApiKey(encrypted: string): string {
const key = Buffer.from(getEnv("CONNECTOR_ENCRYPTION_KEY"), "hex");
if (key.length !== 32) {
throw new Error("CONNECTOR_ENCRYPTION_KEY must be 64 hex chars");
}
const combined = Buffer.from(encrypted, "base64");
const iv = combined.subarray(0, IV_LENGTH);
const tag = combined.subarray(IV_LENGTH, IV_LENGTH + TAG_LENGTH);
const ciphertext = combined.subarray(IV_LENGTH + TAG_LENGTH);
const decipher = createDecipheriv(ALGORITHM, key, iv);
decipher.setAuthTag(tag);
return Buffer.concat([
decipher.update(ciphertext),
decipher.final(),
]).toString("utf8");
}
OAuth connectors

For OAuth-based sources (like Salesforce or O365), you will need HMAC-signed state parameters, token exchange, and refresh logic. See src/connectors/salesforce/oauth.ts or src/connectors/o365/oauth.ts for complete examples.

Step 3: Implement the Connector Class

Update connector.ts with your implementation. Here is the full interface to satisfy:

import type {
Connector,
ConnectorConfig,
ConnectorHealthStatus,
FieldMapping,
SyncResult,
CanonicalPerson,
CanonicalAccount,
CanonicalActivity,
} from "../interface";
import { yourConnectorFieldMappings } from "./schema";
import { decryptApiKey } from "./auth";
import { connectorRegistry } from "../registry";

export class YourConnector implements Connector {
readonly id = "your-connector"; // Unique registry ID
readonly name = "Your Connector"; // Human-readable name
readonly version = "1.0.0";

async authenticate(
config: ConnectorConfig,
): Promise<{ credentials: Record<string, string>; expiresAt?: Date }> {
// 1. Decrypt stored credentials
const apiKey = decryptApiKey(config.settings.encryptedApiKey);

// 2. Validate against the external API
const response = await fetch("https://api.example.com/health", {
headers: { Authorization: `Bearer ${apiKey}` },
});
if (!response.ok) {
throw new Error(`Auth failed: ${response.status}`);
}

// 3. Return credentials (no expiresAt for non-expiring keys)
return { credentials: { apiKey } };
}

async sync(
credentials: Record<string, string>,
lastSyncCursor: string | null,
_options?: { fullSync?: boolean },
): Promise<
SyncResult<CanonicalPerson | CanonicalAccount | CanonicalActivity>
> {
// Implement your sync logic here
// See Step 4 for the sync module pattern
return {
records: [],
cursor: null,
hasMore: false,
syncedAt: new Date(),
stats: { created: 0, updated: 0, errors: 0 },
};
}

getSchema(): FieldMapping[] {
return yourConnectorFieldMappings;
}

async healthCheck(
credentials: Record<string, string>,
): Promise<ConnectorHealthStatus> {
try {
const response = await fetch("https://api.example.com/health", {
headers: { Authorization: `Bearer ${credentials.apiKey}` },
});
return {
healthy: response.ok,
lastSync: null,
error: response.ok ? undefined : `HTTP ${response.status}`,
};
} catch (err) {
return {
healthy: false,
lastSync: null,
error: err instanceof Error ? err.message : String(err),
};
}
}
}

// Self-register -- this line is REQUIRED
connectorRegistry.register(new YourConnector());
Self-registration

The connectorRegistry.register() call at the end of the file is mandatory. Without it, the connector will not appear in connectorRegistry.list() or be discoverable at runtime.

Step 4: Add Sync Logic

Create a sync/ subdirectory with pull.ts and transform.ts files.

pull.ts -- Fetch from External API

Follow the async generator pattern used by all connectors:

// src/connectors/your-connector/sync/pull.ts

export interface YourApiEvent {
id: string;
timestamp: string;
// ... source-specific fields
}

const PAGE_SIZE = 100;
const DEFAULT_BACKFILL_DAYS = 90;

export async function* pullEvents(
apiKey: string,
since: Date | null,
): AsyncGenerator<YourApiEvent[]> {
const sinceIso = since
? since.toISOString()
: new Date(Date.now() - DEFAULT_BACKFILL_DAYS * 86400000).toISOString();

let nextUrl: string | null =
`https://api.example.com/v1/events?since=${sinceIso}&limit=${PAGE_SIZE}`;

while (nextUrl) {
const response = await fetch(nextUrl, {
headers: { Authorization: `Bearer ${apiKey}` },
});

if (!response.ok) {
throw new Error(`API failed: ${response.status}`);
}

const data = await response.json();
if (data.events?.length > 0) {
yield data.events;
}
nextUrl = data.next ?? null;
}
}
Rate limiting

If your source API has rate limits, add pacing (delay between requests) and 429 handling with exponential backoff. See src/connectors/fathom/sync/pull.ts for a robust example with Retry-After header support.

transform.ts -- Map to Canonical Schema

Transform functions must be pure -- no side effects, no API calls, no database access:

// src/connectors/your-connector/sync/transform.ts
import type { CanonicalActivity } from "@/connectors/interface";
import type { YourApiEvent } from "./pull";

const EVENT_TYPE_MAP: Record<string, { type: string; channel: string }> = {
page_view: { type: "page_view", channel: "web" },
form_submit: { type: "form_submit", channel: "web" },
};

export function transformEvent(
event: YourApiEvent,
): CanonicalActivity | null {
const mapping = EVENT_TYPE_MAP[event.type];
if (!mapping) return null; // Skip unmapped types

return {
externalId: event.id,
type: mapping.type,
channel: mapping.channel,
occurredAt: new Date(event.timestamp),
personExternalId: event.userId,
properties: {
source: "your-connector",
// ... additional metadata
},
};
}

Step 5: Define Field Mappings

Update schema.ts with your default field mappings:

// src/connectors/your-connector/schema.ts
import type { FieldMapping } from "../interface";

export const yourConnectorFieldMappings: FieldMapping[] = [
{
sourceField: "user_id",
targetField: "personExternalId",
transform: "none",
required: true,
},
{
sourceField: "email",
targetField: "email",
transform: "lowercase",
required: false,
},
{
sourceField: "event_type",
targetField: "type",
transform: "none",
required: true,
},
{
sourceField: "timestamp",
targetField: "occurredAt",
transform: "none",
required: true,
},
];

Available transforms: none, lowercase, uppercase, trim, custom.

Step 6: Add API Routes

Create OAuth or webhook routes in the Next.js API directory:

src/app/api/connectors/your-connector/
callback/route.ts # OAuth callback (if applicable)
webhook/route.ts # Webhook endpoint (if applicable)

For webhook-based connectors, implement signature verification. See the Customer.io webhook handler for the HMAC-SHA256 pattern:

import { createHmac, timingSafeEqual } from "crypto";

export function verifySignature(
body: Buffer,
signature: string,
secret: string,
): boolean {
const expected = createHmac("sha256", secret)
.update(body)
.digest("hex");
if (signature.length !== expected.length) return false;
return timingSafeEqual(
Buffer.from(signature, "utf8"),
Buffer.from(expected, "utf8"),
);
}

Step 7: Add Temporal Activities (Optional)

If your connector needs scheduled syncs or long-running operations, add Temporal activities:

// src/connectors/temporal/your-connector-activities.ts
import { YourConnector } from "../your-connector/connector";

export async function syncYourConnector(
tenantId: string,
connectorId: string,
): Promise<void> {
const connector = new YourConnector();
// ... authenticate, sync, persist
}

Register the activities in src/connectors/temporal/worker.ts.

Step 8: Write Tests

Create tests in the tests/ directory:

tests/connectors/your-connector/
auth.test.ts # Encryption/decryption
connector.test.ts # authenticate, sync, healthCheck
transform.test.ts # Pure transform functions
pull.test.ts # API pull with mocked fetch
Test-driven development

The project follows TDD London School (mock-first). Mock external API calls using vi.fn() and test transforms as pure functions with known inputs and expected outputs.

Checklist

Before submitting your connector, verify:

  • Implements all 4 Connector interface methods (authenticate, sync, getSchema, healthCheck)
  • Self-registers via connectorRegistry.register()
  • Credentials encrypted at rest with AES-256-GCM
  • No raw tokens or keys in log output
  • All operations scoped to tenantId
  • Default field mappings defined in schema.ts
  • Transform functions are pure (no side effects)
  • Rate limiting implemented for external API calls
  • Zod validation on API responses (recommended)
  • Tests written for auth, transform, and sync logic
  • npm test passes
  • npm run type-check passes

Reference Connectors

ConnectorBest Example Of
SalesforceOAuth flow, bidirectional sync, quality pipeline, writeback
Customer.ioDual-mode ingestion (webhook + API pull), HMAC verification
Delivr.aiSimple API key auth, multiple data streams, buying group integration
Microsoft 365MSAL OAuth, privacy-first architecture, DPA gating, delta queries
Fathom.videoRate limiting with backoff, Zod-validated responses, dual storage model