Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Data Warehouse Abstraction Layer

Overview

The data warehouse abstraction allows you to use any data warehouse (Snowflake, Clickhouse, PostgreSQL, BigQuery, Redshift, Databricks, etc.) without changing application code. Define your warehouse settings by changing one environment variable.

Quick Start

import { inject, type Dependencies } from '../iocContainer/index.js';

class MyService {
  constructor(private readonly dataWarehouse: Dependencies['DataWarehouse']) {}
  
  async getUserData(userId: string, tracer: SafeTracer) {
    return this.dataWarehouse.query(
      'SELECT * FROM users WHERE id = :1',
      tracer,
      [userId]
    );
  }
}

export default inject(['DataWarehouse'], MyService);

Configuration

Select adapters with WAREHOUSE_ADAPTER and (optionally) ANALYTICS_ADAPTER.
Legacy deployments can keep using DATA_WAREHOUSE_PROVIDER; it is still accepted as a fallback.

Snowflake

WAREHOUSE_ADAPTER=snowflake
ANALYTICS_ADAPTER=snowflake
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=snowflake
SNOWFLAKE_ACCOUNT=your_account
SNOWFLAKE_USERNAME=user
SNOWFLAKE_PASSWORD=pass
SNOWFLAKE_DB_NAME=analytics
SNOWFLAKE_WAREHOUSE=COMPUTE_WH
SNOWFLAKE_SCHEMA=PUBLIC

PostgreSQL

WAREHOUSE_ADAPTER=postgresql
ANALYTICS_ADAPTER=postgresql
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=postgresql
DATABASE_HOST=localhost
DATABASE_PORT=5432
DATABASE_NAME=analytics
DATABASE_USER=postgres
DATABASE_PASSWORD=password

Clickhouse

WAREHOUSE_ADAPTER=clickhouse
# Optional: override analytics adapter
# ANALYTICS_ADAPTER=clickhouse
# Legacy fallback:
DATA_WAREHOUSE_PROVIDER=clickhouse
CLICKHOUSE_HOST=localhost
CLICKHOUSE_PORT=8123
CLICKHOUSE_USERNAME=default
CLICKHOUSE_PASSWORD=password
CLICKHOUSE_DATABASE=analytics
CLICKHOUSE_PROTOCOL=http

# Disable analytics writes (while keeping the warehouse)
# ANALYTICS_ADAPTER=noop

How It Works

Three Interfaces

1. IDataWarehouse - Raw SQL queries

await dataWarehouse.query('SELECT * FROM users', tracer);
await dataWarehouse.transaction(async (query) => {
  await query('UPDATE users SET score = :1', [100]);
  await query('INSERT INTO audit_log VALUES (:1)', [userId]);
});

2. IDataWarehouseDialect - Type-safe Kysely queries

const kysely = dialect.getKyselyInstance();
await kysely.selectFrom('users').selectAll().execute();

3. IDataWarehouseAnalytics - Bulk writes & logging

await analytics.bulkWrite('RULE_EXECUTIONS', [
  { ds: '2024-01-01', ts: Date.now(), org_id: 'org1', ... }
]);

How Loggers Work

All analytics loggers use the abstraction:

// server/services/analyticsLoggers/RuleExecutionLogger.ts
class RuleExecutionLogger {
  constructor(private readonly analytics: Dependencies['DataWarehouseAnalytics']) {}
  
  async logRuleExecutions(executions: any[]) {
    await this.analytics.bulkWrite('RULE_EXECUTIONS', executions);
  }
}

export default inject(['DataWarehouseAnalytics'], RuleExecutionLogger);

What happens:

  1. Service calls logger.logRuleExecutions(data)
  2. Logger calls analytics.bulkWrite('RULE_EXECUTIONS', data)
  3. For Snowflake: Buffers → Kafka → Worker → Snowflake (high-throughput)
  4. For Clickhouse: Chunked JSONEachRow inserts over HTTP (default batches of 500 rows)
  5. For PostgreSQL: Buffers → COPY or batch INSERT

No warehouse-specific code in loggers! They just call bulkWrite().

Data Flow

Snowflake (High-Throughput)

RuleExecutionLogger
    ↓
DataWarehouseAnalytics.bulkWrite()
    ↓
SnowflakeAnalyticsAdapter
    ↓
DataLoader (batches 200 rows)
    ↓
Kafka Topic: DATA_WAREHOUSE_INGEST_EVENTS
    ↓
SnowflakeIngestionWorker
    ↓
Snowflake Tables

Clickhouse/PostgreSQL (Direct)

RuleExecutionLogger
    ↓
DataWarehouseAnalytics.bulkWrite()
    ↓
ClickhouseAnalyticsAdapter
    ↓
Chunk rows (default size 500)
    ↓
HTTP JSONEachRow INSERT into Clickhouse

Required Tables

All warehouses need these tables. Schema types defined in /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts.

Core tables:

  • RULE_EXECUTIONS - Rule evaluation logs
  • ACTION_EXECUTIONS - Moderation action logs
  • ITEM_MODEL_SCORES_LOG - ML model prediction logs
  • CONTENT_API_REQUESTS - API request logs

ClickHouse DDL lives alongside the rest of our migrations at
.devops/migrator/src/scripts/clickhouse/. Add new files there when the schema evolves.

Migration examples:

Snowflake

CREATE TABLE RULE_EXECUTIONS (
  DS DATE,
  TS NUMBER,
  ORG_ID VARCHAR,
  RULE_ID VARCHAR,
  PASSED BOOLEAN,
  RESULT VARIANT,  -- JSON
  -- ... ~20 more fields, see IDataWarehouseAnalytics.ts
);

Clickhouse

CREATE TABLE rule_executions (
  ds Date,
  ts UInt64,
  org_id String,
  rule_id String,
  passed UInt8,
  result String,  -- JSON as string
  -- ... ~20 more fields
) ENGINE = MergeTree()
PARTITION BY ds
ORDER BY (ds, ts, org_id);

PostgreSQL

CREATE TABLE rule_executions (
  ds DATE,
  ts BIGINT,
  org_id VARCHAR(255),
  rule_id VARCHAR(255),
  passed BOOLEAN,
  result JSONB,
  -- ... ~20 more fields
) PARTITION BY RANGE (ds);

Full schema: See /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts lines 23-140.

Implementing a Custom Warehouse

Step 1: Implement an IWarehouseAdapter plugin

Create a warehouse adapter under server/plugins/warehouse/adapters:

// server/plugins/warehouse/adapters/MyWarehouseAdapter.ts
import type SafeTracer from '../../../utils/SafeTracer.js';
import type { IWarehouseAdapter } from '../IWarehouseAdapter.js';
import {
  type WarehouseQueryFn,
  type WarehouseQueryResult,
  type WarehouseTransactionFn,
} from '../types.js';

export class MyWarehouseAdapter implements IWarehouseAdapter {
  readonly name = 'my-warehouse';

  constructor(private readonly client: SomeWarehouseClient, private readonly tracer?: SafeTracer) {}

  start(): void {
    // Optional: warm up connection pools
  }

  async query<T = WarehouseQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
    const execute = async () => {
      const rows = await this.client.execute(sql, params);
      return rows as readonly T[];
    };

    return this.tracer
      ? (this.tracer.addActiveSpan({ resource: 'my-warehouse.query', operation: 'query' }, execute) as Promise<readonly T[]>)
      : execute();
  }

  async transaction<T>(fn: WarehouseTransactionFn<T>): Promise<T> {
    return this.client.transaction(async () => fn((statement, parameters) => this.query(statement, parameters)));
  }

  async flush(): Promise<void> {}

  async close(): Promise<void> {
    await this.client.close();
  }
}

Step 2: Provide a IDataWarehouseDialect (Kysely) implementation

If you need type-safe queries, create a dialect wrapper (see ClickhouseKyselyAdapter for a concrete example) and return it from DataWarehouseFactory.createKyselyDialect.

Step 3: Implement an IAnalyticsAdapter plugin

Analytics adapters live under server/plugins/analytics/adapters and implement bulk writes plus optional CDC:

// server/plugins/analytics/adapters/MyAnalyticsAdapter.ts
import type { IAnalyticsAdapter } from '../IAnalyticsAdapter.js';
import {
  type AnalyticsEventInput,
  type AnalyticsQueryResult,
  type AnalyticsWriteOptions,
} from '../types.js';

export class MyAnalyticsAdapter implements IAnalyticsAdapter {
  readonly name = 'my-analytics';

  constructor(private readonly client: SomeWarehouseClient) {}

  async writeEvents(table: string, events: readonly AnalyticsEventInput[], _options?: AnalyticsWriteOptions): Promise<void> {
    if (events.length === 0) {
      return;
    }
    await this.client.insert(table, events);
  }

  async query<T = AnalyticsQueryResult>(sql: string, params: readonly unknown[] = []): Promise<readonly T[]> {
    return (await this.client.query(sql, params)) as readonly T[];
  }

  async flush(): Promise<void> {}

  async close(): Promise<void> {
    await this.client.close();
  }
}

Step 4: Register the provider in DataWarehouseFactory

Update DataWarehouseFactory.createDataWarehouse, createKyselyDialect, and createAnalyticsAdapter to instantiate your plugins. The factory wraps them in bridges so the rest of the application only speaks the generic interfaces.

Step 5: Create Analytics Tables

All warehouses need the same tables (schema in IDataWarehouseAnalytics.ts):

-- Adapt syntax for your warehouse
CREATE TABLE rule_executions (
  ds DATE,
  ts BIGINT,
  org_id VARCHAR,
  item_id VARCHAR,
  rule_id VARCHAR,
  passed BOOLEAN,
  result JSON,  -- Or VARIANT, JSONB, String depending on warehouse
  -- ... see IDataWarehouseAnalytics.ts for all ~20 fields
);

Step 6: Configure and Run

export WAREHOUSE_ADAPTER=your-warehouse
# Optional overrides
# export ANALYTICS_ADAPTER=your-warehouse
# Legacy fallback:
# export DATA_WAREHOUSE_PROVIDER=your-warehouse
export YOUR_WAREHOUSE_HOST=localhost
# ... other config vars

npm start

How Services Consume Analytics Data

Services query analytics data using DataWarehouseDialect:

// server/services/analyticsQueries/UserHistoryQueries.ts
class UserHistoryQueries {
  constructor(private readonly dialect: Dependencies['DataWarehouseDialect']) {}

  async getUserRuleExecutionsHistory(orgId: string, userId: string) {
    const kysely = this.dialect.getKyselyInstance();
    
    return kysely
      .selectFrom('RULE_EXECUTIONS')
      .where('ORG_ID', '=', orgId)
      .where('ITEM_CREATOR_ID', '=', userId)
      .selectAll()
      .execute();
  }
}

export default inject(['DataWarehouseDialect'], UserHistoryQueries);

Works with any warehouse:

  • Snowflake: Uses SnowflakeDialect
  • Clickhouse: Uses ClickhouseDialect
  • PostgreSQL: Uses PostgresDialect

Available IOC Services

ServiceTypePurpose
DataWarehouseIDataWarehouseRaw SQL, transactions
DataWarehouseDialectIDataWarehouseDialectType-safe queries
DataWarehouseAnalyticsIDataWarehouseAnalyticsBulk writes, logging

File Structure

server/storage/dataWarehouse/
├── IDataWarehouse.ts              # Core interface
├── IDataWarehouseAnalytics.ts     # Analytics interface + schema types
├── DataWarehouseFactory.ts        # Instantiates adapters via env configuration
├── ClickhouseAdapter.ts           # 📝 Stub - implement this
├── ClickhouseAnalyticsAdapter.ts  # 📝 Stub - implement this
├── PostgresAnalyticsAdapter.ts    # 📝 Stub - implement this
└── index.ts

server/plugins/warehouse/           # Pluggable warehouse adapters
├── adapters/SnowflakeWarehouseAdapter.ts
├── examples/NoOpWarehouseAdapter.ts
└── ...

server/plugins/analytics/           # Pluggable analytics adapters
├── adapters/SnowflakeAnalyticsAdapter.ts
├── examples/NoOpAnalyticsAdapter.ts
└── ...

server/services/analyticsLoggers/   # Warehouse-agnostic loggers
├── RuleExecutionLogger.ts         # Uses DataWarehouseAnalytics
├── ActionExecutionLogger.ts       # Uses DataWarehouseAnalytics
├── ItemModelScoreLogger.ts        # Uses DataWarehouseAnalytics
└── ...

server/services/analyticsQueries/   # Warehouse-agnostic queries
├── UserHistoryQueries.ts          # Uses DataWarehouseDialect
├── ItemHistoryQueries.ts          # Uses DataWarehouseDialect
└── ...

References

  • Schema types: /server/storage/dataWarehouse/IDataWarehouseAnalytics.ts
  • Snowflake warehouse adapter: /server/plugins/warehouse/adapters/SnowflakeWarehouseAdapter.ts
  • Snowflake analytics adapter: /server/plugins/analytics/adapters/SnowflakeAnalyticsAdapter.ts
  • Migration setup: /.devops/migrator/src/configs/snowflake.ts
  • Loggers: /server/services/analyticsLoggers/
  • Queries: /server/services/analyticsQueries/