reverse engineering reverse ETL & ETL, sort of.

Preface

The idea here is to build a working architecture from first principles. i'm still learning on how to build applications without the obvious bottlenecks, and this is an attempt at that. there’s benefits and tradeoffs to what I’ve built, and I’ve tried discussing them out below.

What’re we building?

Let’s understand what parts of a Reverse ETL and ETL system we’re trying to build.

The idea is simple: serving use-cases of a CDP or a traditional ETL/Reverse ETL platform, which would involve fetching data from different sources (cloud apps, or websites for event tracking) - and sending these events to data warehouses like Snowflake (ETL), and activating this data in the warehouse into customer platforms like Salesforce, Braze, etc (RETL), and understanding the engineering nuances to be able to do so.

This is the UI I built, consisting of sources, destinations, and pipelines to stream data.

Parts we’re building:

Reverse ETL

  • Extract: Pull data from the data warehouse or data lake.

  • Load: Push data back into applications like Salesforce, Braze, etc.

  • Use Case: Activate data for real-time marketing, CRM, and sales activities.

ETL

  • Extract: Pull data from various sources like cloud apps, websites, and databases.

  • Load: Store transformed data into a data warehouse like Snowflake.

  • Use Case: Centralize data for analysis, business intelligence, etc.

Some companies which are really popular for different parts of this:

  • Segment (CDP)

  • Hightouch (Reverse ETL)

  • Rudderstack (CDP)

Reverse ETL

The idea behind reverse ETL is to help activate data that is present in a warehouse like Snowflake, Redshift, or any others. For instance, companies hold tons of user related data in these warehouses, data that might be very useful for running personalized ads, campaigns, and more, and would help marketers, and business teams make better decisions. The benefit of reverse ETL is that you can sync all your data within the warehouse, form audiences within the warehouse, and be warehouse-first. This approach allows a company’s single point of storage, its warehouse, to maximize its functionality, and when coupled with a Reverse ETL platform, these audiences formed within the warehouse can be sent to the right platforms like salesforce for data activation.

How does Reverse ETL work, in code?

The idea is simple, sync data from data warehouses like Snowflake into customer third-party platforms, where we could activate data from. But there are several challenges that come when working with reverse ETL. Before we get into challenges, let’s fundamentally understand what we’re trying to achieve with snowflake. We will consider snowflake as an example for our data warehouse here, but a similar concept can be applied across warehouses.

Since we want to get data from snowflake tables, the simplest way to think of it is us using Snowflake’s APIs to fetch table data. But the question is, do we send all the data every single time we run a sync with the Snowflake API? No.

This is because we need to avoid duplicate data going to customer platforms. Simply put, it’s not efficient, and it will create more complications than solutions for business teams who might use the data being ingested into salesforce, for instance. This means we need to create diffs, where diffs represent the change in the table data since the previous sync we ran.

CDC (Change Data Capture) in Snowflake

The naive approach - Timestamps

The simplest approach we could think of is to query data based off the timestamp (updated_at) field in the table. This approach entails that we could store previous sync’s timestamp somewhere (could be Redis, could be NATS KV), and use this previous sync’s timestamp which we got from the latest updated_at field from the snowflake table the last time we ran the sync to get the newly changed rows in the table.

The issues with a timestamps based approach

There is no guarantee that there would be an updated_at column in every snowflake table, since tables are defined by the user themselves. In a case, where the updated_at column does not exist, our approach entirely fails. In theory, we could gain write access from the user for the snowflake table and create the updated_at column on every table there is, but the users query which ingests data into snowflake still wouldn’t have that column, and even if we defaulted the column to the current_timestamp on operations on the table, at scale, we're increasing the volume of data in each table by a lot more.

The compute intensive approach - Diffs on the server

The slightly better approach would be to do diffs on the server itself. This would entail storing the previous sync’s data, comparing it to the new sync’s data, and filtering out the changes, and then sending the data forward to the destination. This could be done using a diff file, which would help looking for changes with a comparison between the previous and new sync, using a primary key (unique).

  • If the primary key on a new record does not exist in the diff file, this is a potential insert into the table, and can be sent to the destination.

  • If the primary key exists, we can compare the changes between columns.

  • If the primary key does not exist in the sync but in the diff file, this is a potential deletion from the table.

The issues with diffs on the server

  1. Compute Intensive: This approach, while functional, is very compute intensive at scale. As one could imagine, for small data syncs, this approach works great, since we could quickly within seconds run a comparison between the new sync of data and the diff file, and figure out the records to send to the destination. However, for larger companies with potentially millions of rows of data, this can be very expensive, and might entail several minutes of comparison before the data reaches the destination.

  2. Data Transformations running on a cron job: Most companies, once raw data is ingested into the warehouse run data transformations to clean, filter, etc. data that enters the warehouse. These jobs are usually run in batch as a cron job which could range from every hour to every day. The data we’d ideally sync to destinations is this transformed data, and if the data is only being transformed every hour, our syncs cannot be at a faster rate than the hour itself, since there would be no diff in the data each time we run the sync in between the hour.

The Optimal Approach - Dynamic tables in Snowflake

Looking at the two approaches, what we’re now looking for is a way to reduce compute costs, and get perform faster syncs.

Luckily, several warehouses have introduced the concept of something called dynamic tables (termed differently in other warehouses), where all companies do is write SQL queries for transformations on their data, and as raw data flows into the designated tables, the warehouse itself handles running the transformation, and sending data into these dynamic tables. What this enables is us now just observing changes streaming in on this table, and as new changes come in, we send it to the destination.

The benefits of dynamic tables

  1. Non-compute Intensive: This approach reduces compute costs significantly, since we now handle diffs within the warehouse itself using dynamic tables in snowflake, and other versions of it in other warehouses.

  2. No need to run data transformations on a cron job: With dynamic tables in place, the changes would dynamically land in our transformed data tables as raw data comes in, eliminating the need for transformations on a cron job.

Coded out implementation for warehouse diffs:

Everytime a user says that they want to sync data from snowflake, there’s a choice to either do diffs on the server, or within the warehouse.

Server based diffs

  • The initial sync runs and gets all data from snowflake tables.

  • All data is transformed into a format (JSON), and stored in a diff file.

  • The next sync is performed to fetch all data again, and this fetched data is compared against the data in the diff file. Once the comparison is complete, appropriate records are sent forward, and this sync’s data is stored into the diff file, which is now updated.

  • This process repeats for every sync on a cron, possibly defined by the user.

In-warehouse diffs (requires write access to snowflake)

  • The initial sync runs and gets all the snowflake tables that we need to send data from downstream.

  • Creates a dynamic table against each of these tables with a minimum target lag of 1 minute (ideally this would be the table that companies would have which would be running transformations from raw tables).

  • Attaches a stream to these dynamic tables to listen to all changes.

  • We then query the stream every target lag time, and retrieve all changes made, ack all changes in the stream, and the stream is the now listening for new changes entering into the dynamic table.

Step 1: Retrieve all tables we want to get data from

Step 2: Loop through all tables we need to fetch data from, and create dynamic tables on it to monitor changes within snowflake:

for _, tableName := range tables {
        if s.isDynamicTable(tableName) {
            fmt.Printf("Skipping dynamic table: %s\n", tableName)
            continue
        }

        deleteQuery := fmt.Sprintf(deleteStreamQuery, fmt.Sprintf("%s.PUBLIC.%s_STREAM", s.DbName, tableName))
        _, err := s.conn.Exec(deleteQuery)
        if err != nil {
            return fmt.Errorf("failed to delete stream: %v", err)
        }

        streamQuery := fmt.Sprintf(streamOnDynamicTableQuery, fmt.Sprintf("%s.PUBLIC.%s", s.DbName, fmt.Sprintf("%s_STREAM", tableName)), fmt.Sprintf("%s.PUBLIC.%s", s.DbName, fmt.Sprintf("%s_DYNAMIC", tableName)))
        _, err = s.conn.Exec(streamQuery)
        if err != nil {
            return fmt.Errorf("failed to create stream: %v", err)
        }
    }

Step 3:

On the next sync of data after an interval, read the streams we created for all tables managed by the platform. Read all changes that exist in the stream, acknowledge them, and send these changes downstream.

for _, tableName := range tables {
        if s.isDynamicTable(tableName) {
            fmt.Printf("Skipping dynamic table: %s\n", tableName)
            continue
        }
        streamName := fmt.Sprintf("%s.PUBLIC.%s_STREAM", s.DbName, tableName)
        query := fmt.Sprintf("SELECT * FROM %s", streamName)
        rows, err := s.conn.QueryContext(ctx, query)
        if err != nil {
            return fmt.Errorf("failed to query stream %s: %v", streamName, err)
        }
        defer rows.Close()

        columns, err := rows.Columns()
        if err != nil {
            return fmt.Errorf("failed to get columns for stream %s: %v", streamName, err)
        }

        values := make([]interface{}, len(columns))
        scanArgs := make([]interface{}, len(columns))
        for i := range values {
            scanArgs[i] = &values[i]
        }

        var batchSize = 100
        var batch [][]byte

        for rows.Next() {
            if err := rows.Scan(scanArgs...); err != nil {
                return fmt.Errorf("failed to scan row: %v", err)
            }

            record := make(map[string]interface{})
            for i, col := range columns {
                val := values[i]
                record[col] = val
            }

            jsonData, err := json.Marshal(record)
            if err != nil {
                return fmt.Errorf("failed to marshal record: %v", err)
            }

            batch = append(batch, jsonData)

            if len(batch) >= batchSize {
                if err := sendBatch(pipelineID, batch, s.js, ctx); err != nil {
                    return err
                }
                batch = nil
            }
        }

        if len(batch) > 0 {
            if err := sendBatch(pipelineID, batch, s.js, ctx); err != nil {
                return err
            }
        }

        if err = rows.Err(); err != nil {
            return fmt.Errorf("error during row iteration: %v", err)
        }

        err = s.handleStreamDeletionAndRecreation()
        if err != nil {
            return err
        }
    }

In an ideal scenario, this lag time would be <1 minute, and we would be constantly listening to the stream in snowflake and not performing repeated syncs, so that we have near real-time data syncs from the warehouse, with very less compute costs on our end.

After retrieving the data, we would publish this data into our data streaming NATS service, which with NATS Jetstream, coupled with NATS KV helps our data reach the designated output destination from snowflake, and this would repeat on every data sync.


ETL:

How does ETL work, in code? Extracting data from Cloud Apps, with diffs

Cloud apps are generally speaking third party platforms that could hold any type of data a company would want, which could range from customer data, to sales data, to customer event actions, or tracking data with PostHog, etc.

This is the UI I built, consisting of sources, destinations, and pipelines to stream data.

Hence, the idea is simple, sync data from these platforms into warehouses like Snowflake, which are traditional storage points for most large companies.

Since we want to get data from these sources, the simplest way to think of it is us using the app’s APIs to fetch data. But the question is, do we send all the data every single time we run a sync with the Snowflake API? No.

This is because we need to avoid duplicate data going into warehouses, since it can affect data transformations that happen within the warehouse, and significantly increase how much data is held in the warehouse, increase costs as well. Simply put, it’s not efficient, and it will create more complications than solutions. This means we need to create diffs, where diffs represent the change in the data since the previous sync we ran.

For ETL, since we’re extracting data from cloud apps, several cloud apps provide a way to work with timestamps from their API for the generic types of event data. This would mean us querying the API with a timestamp as a query parameter to get all the data since that timestamp, and using timestamp fields from the data retrieved stored as state (this will become more obvious in the code block below), and using that state in the next sync we run for the data.

Example by fetching MongoDB data

Let’s say our source was MongoDB organizational events, and we’re using the MongoDB API to pull org event data.

func (m *MongoDB) Run(ctx context.Context, pipelineID int64, js jetstream.JetStream) error {
    currentSyncTime := time.Now().UTC()

    projIds, err := m.client.GetProjsIds(ctx)
    if err != nil {
        return fmt.Errorf("failed to get project IDs: %w", err)
    }

    if m.state == nil {
        m.state = &InputState{
            LastSyncTime: currentSyncTime.Add(-10000 * time.Hour),
        }
    }

    for _, id := range projIds {
        pageNumber := 1
        for {
            events, err := m.client.GetProjEventsByPage(ctx, id, m.state.LastSyncTime, pageNumber)
            if err != nil {
                return fmt.Errorf("failed to get events for project %s: %w", id, err)
            }

            if len(events.Results) == 0 {
                break
            }

            var allEvents [][]byte
            for _, event := range events.Results {
                eventBytes, err := json.Marshal(event)
                if err != nil {
                    return fmt.Errorf("failed to marshal event: %w", err)
                }
                allEvents = append(allEvents, eventBytes)
            }

            destRecord := map[string]interface{}{
                "pipeline_id": pipelineID,
                "records":     allEvents,
            }
            destRecordBytes, err := json.Marshal(destRecord)
            if err != nil {
                return fmt.Errorf("failed to marshal event: %w", err)
            }
            ack, err := js.Publish(ctx, "OUTPUT", destRecordBytes)
            if err != nil {
                return fmt.Errorf("failed to publish message to OUTPUT subject: %w", err)
            }

            log.Printf("Message published to OUTPUT subject. Ack: Stream=%s, Seq=%d", ack.Stream, ack.Sequence)
            pageNumber++
        }
    }

    m.state.LastSyncTime = currentSyncTime
    return nil
}

Here, we retrieve project ids, paginate on project events, retrieve them from the API, and publish them to a NATS stream subject called “OUTPUT”, which would have a listener, and would then send data to the designated output.

Dealing with timestamps

Currently, we just use the timestamp we queried the API on as our state, but that’s not ideal, and can cause record duplications, since the current timestamp we store in state, and the latest timestamp that may exist on the JSON record could be with different precisions, and different times altogether.

This is what a MongoDB org events response looks like:

{
  "alertConfigId": "{alertConfigId}",
  "alertId": "{alertId}",
  "clusterId": "{clusterId}",
  "clusterName": "Test Cluster",
  "created": "2018-06-11T12:34:56Z",
  "currentValue": {
    "number": 50,
    "units": "RAW"
  },
  "eventTypeName": "OUTSIDE_METRIC_THRESHOLD",
  "groupId": "{groupId}",
  "hostId": "{hostId}",
  "hostname": "db.example.com",
  "id": "{globalAlertId}",
  "isGlobalAdmin": false,
  "maintenanceWindowId": "{maintenanceWindowId}",
  "metricName": "OPCOUNTER_CMD",
  "orgId": "{orgId}",
  "port": 27017,
  "remoteAddress": "192.168.1.1",
  "replicaSetName": "rs1",
  "shardName": "sh1",
  "userId": "{userId}",
  "username": "john.doe@example.com",
  "targetUsername": "jane.doe@example.com",
  "teamId": "{teamId}"
}

Let’s say we pulled 50 organizational events in our first sync, all returned in asc order, with the latest timestamp on the record being:

"created": "2018-06-11T12:34:56Z"

This timestamp, which is retrieved from the record itself is what should stored as state, so that the next time we query the API, we would use this timestamp as a query parameter to get all the records since that timestamp.

The nuance to keep in mind with this approach:

  • Timestamp precision: making sure that we’re handling precision of timestamps effectively is very important, since not doing so could create duplication of records.

  • For example, if the API returns records to the second, but we are storing state to the milisecond as returned by the API responses, we could have duplicate records produced since the API is working off the second, but we’re managing state to the milisecond.

Currently, we’re storing state locally in-memory, but ideally we’d store state in a more persistent manner, specifically by utilizing something like NATS KV to store the timestamp state from one sync, and using that timestamp stored in the next sync to get all records since that timestamp.

Ingesting data into destinations

For ETL, this usually means ingesting data into platforms meant for data storage meant for large amounts of data like Elasticsearch, and for Reverse ETL this would mean any other customer platform, and the one that I built out was called Algolia.

Sending to destinations - Reverse ETL

The idea here is to have a consumer to our NATS stream, and send the data to the designated output.

_, err = destinationsConsumer.Consume(func(msg jetstream.Msg) {
    msg.Ack()
    err := destinations.HandleSendingToDestination(msg.Data(), df.db, df.kv)
    if err != nil {
        panic(err)
    }
})

func HandleSendingToDestination(recordsToSendToDestination []byte, db *db.DB, kv jetstream.KeyValue) error {
    var destinationRecord nats.DestinationRecord
    err := json.Unmarshal(recordsToSendToDestination, &destinationRecord)
    if err != nil {
        return err
    }
    val, err := kv.Get(context.Background(), fmt.Sprintf("%s-destination", strconv.FormatInt(destinationRecord.PipelineID, 10)))
    if err != nil {
        return fmt.Errorf("failed to get value from KV store: %v", err)
    }

    valueString := string(val.Value())

    intValue, err := strconv.ParseInt(valueString, 10, 64)
    if err != nil {
        return fmt.Errorf("failed to parse value from KV store: %v", err)
    }

    destination, err := db.GetDestinationById(context.Background(), intValue)
    if err != nil {
        return err
    }

    destinations := integrations.FetchDestinations()
    destinationToRun := destinations[destination.DestinationType]

    var destConfig map[string]interface{}
    err = json.Unmarshal(destination.Config, &destConfig)
    if err != nil {
        return err
    }

    destinationToRun.Initialize(destConfig)

    destinationToRun.Run(destinationRecord)

    return nil
}

What we’re doing here is, ack-ing any message that comes to the subject from the NATS stream, and calling a HandleSendingToDestination function which takes in the message that came in, which would consist of all the records that need to be ingested, and the pipeline id. And we would use the pipeline ID coupled with NATS KV to figure out what destination to send data to (elasticsearch, algolia, etc.), and call the run method off the destination interface which looks like the below:

type Destination interface {
    Initialize(config map[string]interface{}) error
    DestinationID() string
    Run(d nats.DestinationRecord) error
}

The below is the Run method implementation of sending data to Algolia, one of many customer platforms we could send data to.

func (a *Algolia) Run(r nats.DestinationRecord) error {
    log.Printf("Processing record with PipelineID: %d", r.PipelineID)

    for _, recordData := range r.Records {
        var document map[string]interface{}
        err := json.Unmarshal(recordData, &document)
        if err != nil {
            log.Printf("Failed to unmarshal record: %s", err)
            continue
        }

        if _, exists := document["objectID"]; !exists {
            document["objectID"] = fmt.Sprintf("%s-%d", algoliaID, r.PipelineID)
        }

        _, err = a.index.SaveObject(document)
        if err != nil {
            log.Printf("Failed to index document in Algolia: %s", err)
            continue
        }

        log.Printf("Successfully indexed document in Algolia: %v", document)
    }

    return nil
}

The Nuance

  • Algolia is relatively simple to ingest data into, but we need to keep in mind that several platforms come with their own set of rules and regulations with the API with regards to rate limits, data formats (for example: there are platforms that only may support CSV ingestion, or a JSON blob that needs to have the appropriate keys, etc).

Sending to destinations - ETL

Similarly for ETL, we still listen to messages with the NATS consumer, and eventually call the Run method. The change between ETL and Reverse ETL comes in the nuances of the implementation of the Run method.

The below is the Run method implementation of sending data to Elasticsearch, one of many data storage platforms we could send data to.

func (e *ElasticSearch) Run(record nats.DestinationRecord) error {
    ctx := context.Background()
    for _, recordBytes := range record.Records {
        err := e.bulkIndex.Add(
            ctx,
            esutil.BulkIndexerItem{
                Action: "index",
                Body:   bytes.NewReader(recordBytes),
                OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {
                },
                OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {
                    if err != nil {
                        fmt.Printf("Error indexing document for pipeline %d: %v\n", record.PipelineID, err)
                    } else {
                        fmt.Printf("Elasticsearch error for pipeline %d: %s\n", record.PipelineID, res.Error.Reason)
                    }
                },
                Index: e.index,
            },
        )

        if err != nil {
            return fmt.Errorf("error adding document to bulk indexer for pipeline %d: %w", record.PipelineID, err)
        }
    }

    return nil
}

Demo from the UI

Below is an image based demo of sending data from Snowflake (data warehouse) with CDC (diffs within the warehouse) to Algolia, a customer success platform, and MongoDB Cloud (Project Events data) to Elasticsearch, a search and analytics engine.

Step 1: Setup Sources

Step 2: Setup Destinations

Snowflake table data → Algolia

A pipeline configured to send data from snowflake to elasticsearch.

Once the pipeline starts, we see the platform create and manage dynamic tables and streams in the snowflake UI for all the tables present. These dynamic tables and streams enable diffing within the warehouse, explained more in detail in the write-up above this demo.

Data appearing in Algolia:

MongoDB Events data → Elasticsearch

A pipeline configured to send data from snowflake to elasticsearch.

Data appearing in Elasticsearch: