Cauchy — Threat Detection and Response at Scale

Serverless processing of security logs using DuckDB and delta-rs

This post is adapted from work performed by the research and development team in DNB’s Cyber Defense Center. The original scope was to investigate how to reduce the cost of ingesting security logs while minimizing the negative impact on threat detection metrics, e.g., SLAs, latencies and operational overheads.

Background

Delta tables have been used for security log storage since the beginning - when Delta Lake was introduced at the 2018 Spark+AI summit, Apple’s threat detection and response platform. Apple’s architecture was built around a design where upstream systems wrote to s3 and then Spark structured streaming jobs were used with the s3-sqs source to process new files as they arrived.

Instead of utilizing Spark’s JSON reader, the files were handled as lines of text and each line was parsed using an extended version of from_json with the original raw data preserved as a separate column. Additional metadata columns were also added to:

  • enforce the data quality by evaluating rules, e.g., “timestamp != null”
  • to assist in debugging
  • measure latencies at each stage, and to
  • track linage.

A bad line or one whose contents does not match schema will result in null values being extracted. Retaining the raw data and metadata columns would therefore help in debugging what went wrong, give the ability to re-parse the event and so protects against data loss from corrupt records. After this initial stage, the data moves through a series of downstream transformation pipelines. These pipelines get pretty sophisticated and are well worth studying, but they’re beyond the scope of what we’re discussing here.

Extract, Load and Transform

While Apple operates at a scale far beyond most organizations, we all face the same challenge: getting security logs into Delta tables efficiently. In this article, we’ll explore how to ingest security logs in a cost-effective way by moving away from Apache Spark and the JVM altogether.

The general architecture of a security log ingestion pipeline is shown below:

Object storage
Object storage
Parser
Parser
Delta table
Delta table
JSON
JSON
Append only
Append only
Dead letter
queue
Dead letter...
Records that the parser marks as
corrupt, e.g., ones does not conform to the
schema, are written to DLQ where they
fixed be a seperate job.
Records that the parser marks as...

Data is written by a data provider to object storage or a system like Kafka. These are then picked up by the parser whose responsibility is to:

  1. Apply the proper schema
  2. Add columns for linage tracking and other metadata information
  3. Evaluate data quality rules
  4. Metadata
Data is written by a data provider to object storage or a system like K...
Corrupt records
Corrupt records
Text is not SVG - cannot display

Fig 1: A high-level security logs Delta lake ingestion pipeline.

Microsoft Defender for XDR

The logs we will process come from Microsoft Defender for XDR, which a suite of tools that provides comprehensive telemetry from:

  • Endpoints (Microsoft Defender for Endpoint)
  • Email and collaboration tools (Defender for Office 365)
  • Identities (Defender for Identity and Microsoft Entra ID)
  • Cloud Applications (Defender for Cloud Apps)
  • Cloud Infrastructure (Defender for Cloud)
  • Vulnerability Management

The export options offered by Microsoft are either:

  1. streaming into an Event Hub, or
  2. to write to append blobs in storage accounts that are rotated on an hourly basis.

We built our threat detection platform on Azure Databricks using Delta Live Tables with the medallion architecture, Kafka connector for Event Hub ingestion, and then we had spark structured streaming jobs that ran detection queries from the silver tables.

When Microsoft exports security logs to an event hub (which is equivalent to a Kafka topic), it uses a specific JSON payload structure that adds complexity to the processing pipeline. This payload follows the format:

{
  "records": [
    {
      "time": "<The time WDATP received the event>"
      "tenantId": "<tenantId>",
      "category": "<AdvancedHunting-category>",
      "properties": { 
        <WDATP Advanced Hunting event as Json> 
      }
    }
  ]
}

The schema of the JSON payload when Defender for XDR logs are streamed to an event hub.

Processing this structure requires multiple steps:

  1. First, we use an explode operation on the records field to transform the array elements into individual rows.
  2. Since the schema of the properties column varies based on the category value, we can’t apply a uniform schema directly.
  3. Instead, we perform partial deserialization to extract just the category and timestamp.
  4. We then write this data to a bronze-layer MDE table, partitioned by both category and timestamp.

For each category, we create a separate processing stream that filters by category, applies the appropriate schema, adds metadata for lineage tracking, data quality control, and latency measurements, then writes to silver-layer tables:

Delta table
(MDE)
Delta table...
Delta table
Delta table
Delta table
Delta table
Delta table
Delta table
DeviceRegistryEvents
DeviceRegistryEvents
DeviceProcessEvents
DeviceProcessEvents
Event Hub
(Kafka API)
Event Hub...
DeviceEvents
DeviceEvents
JSON
JSON
...
...
...
...
Extract the timestamp and category fields and
and use them as partition columns
Extract the timestamp and category fields and...
Create stream with partition filter
p_category = 'DeviceProcessEvents'
and apply schema
Create stream with partition filter...
Bronze
Bronze
Silver
Silver
Text is not SVG - cannot display

Fig 3: A continuously streaming DLT pipeline for processing Defender for XDR data from an event hub.

While this DLT pipeline works really well - it has great autoscaling capabilities and has been stable except for occasional Azure issues (Norway East!) - it’s way too expensive for what amounts to simply parsing JSON and writing Parquet files to object storage. The fundamental issue lies with Apache Spark itself. The JVM runtime overhead and the complexity of distributed frameworks make it unnecessarily resource-intensive for our use case. Spark is designed for complex data processing with operations like grouping, joining, and merging events - when all we need is a straightforward streaming pipeline that appends to a table with lightweight transformations. We considered migrating from DLT to vanilla Spark structured streaming jobs, but we would lose the enhanced autoscaler and would introduce more engineering overhead.

Moving off the JVM

In 2021, engineers at Scribd - Christian Williams, QP Hou, and R. Tyler Croy - faced the same problem. They needed some way to stream data into Delta Lake from Kafka at fast, cheap and at scale. Earlier they had created a native-rust binary for Delta Lake - delta-rs. They realized they could use this library to create a daemon service kafka-delta-ingest for writing the contents of a topic into table.

The design document explains in-detail how KDI works, but the main points are summarized as follows:

  • A job is a group of processes that are responsible for reading messages from one Kafka topic and writing it to a single table. Partitions of that topic are distributed amongst the processes through Kafka’s consumer group mechanism.

  • In order to avoid the small file problem and be more efficient, Kafka Delta ingest buffers the transformed data in memory before writing the Parquet files directly to object storage. The actual object storage write occurs when the memory buffer either exceeds a configured size threshold or after specified max duration has passed.

  • Transformations are expressed using JMESPath queries and are typically used to extract partition columns or include other metadata.

  • When a process eventually performs the append operation, it commits the offset of the associated partition in a transaction identifier. At a startup or after a Kafka rebalance a process can query the delta log to find the last processed offset for each partition it is assign.

The final point is worth highlighting as it is this mechanism that eliminates the need for processes to talk to each other. By storing offsets in the Delta table itself, each process works independently. This clever approach turns what would typically need a complex distributed system into something much simpler - just separate processes that can easily scale up. There’s no external database needed to track state - everything lives in the Delta log.

In order to make this concept more clear it’s best to illustrate using delta-rs. Lets define a new delta table ProcessEvents that want to insert data into:

schema = (
  pa.schema(
    [
      pa.field("Timestamp", pa.timestamp("us", tz="UTC")),
      pa.field("AccountUpn", pa.string()),
      # ... 
      pa.field("InitiatingProcessFileName", pa.string()),
    ]
  )
)

dt = (
  DeltaTable.create(
      'ProcessEvents',
      schema=schema,
      mode='overwrite'
  )
)

We have used KDI for streaming Corelight data from event hubs using Azure Container Apps, some running continuously and others, for the low-volume topics, scheduled as cron jobs. Using a simple autoscaling system that adds processes to a job when the average CPU/metrics exceeds a certain threshold, we have achieved a simple, cheap and low-maintenance solution that the reduced costs by 90%.

However, as pointed out earlier, when Microsoft exports Defender for XDR data to an event hub, the different event types are not only sent to the same topic but also mixed together in the nested properties column. This structure doesn’t align well with KDI’s 1:1 topic-to-table pattern. We would still need an intermediate bronze layer partitioned by category, followed by separate streaming jobs for each category. This approach would force us to manage dozens of Spark streaming jobs, reintroducing the complexity we aimed to avoid.

Okay, so why don't you...