00
DAYS
00
HRS
00
MIN
00
SEC
SEE WHAT's NEW IN SIGMA TODAY!
A yellow arrow pointing to the right.
A yellow arrow pointing to the right.
Yifeng Wu
Software Engineer
October 4, 2024

Optimizing Data Retrieval: A Deep Dive Into Sigma's Use Of Arrow Batches

October 4, 2024
Optimizing Data Retrieval: A Deep Dive Into Sigma's Use Of Arrow Batches

At Sigma, we've integrated Apache Arrow into our architecture to improve how data moves across services, cutting out inefficiencies and improving performance. Arrow's zero-copy, in-memory columnar format allows us to handle large datasets with speed and precision, especially when working with cloud data warehouses like Snowflake and Databricks.

In this blog, we’ll dive into the technical aspects of how we integrated Arrow batches directly from the data warehouse, we solved along the way, and how these changes enhance the reliability of our platform.

Optimizing data flow with Apache Arrow in Sigma

What's Apache Arrow?

Apache Arrow is a high-performance, multi-language columnar format that enables zero-copy read operations. By leveraging in-memory data representation, it ensures fast reads and writes. At Sigma, we use Apache Arrow to move data across stacks.

Sigma’s connection manager

Sigma’s architecture includes a connection manager service that stands at the frontier to manage connections, load, and retrieve data from warehouses. Traditionally, this process involved unpacking and repacking data before passing it to another service for transformation. However, some drivers (e.g., Snowflake, Databricks) provide a way to fetch Arrow batches directly rather than row-based data. This makes it possible to eliminate the redundant unpacking and repacking steps in the connection manager, allowing data to travel directly to the final destination and bypassing unnecessary detours.

Benefit of Arrow batches

At Sigma, each service is specialized for specific features within our query lifecycle. These services are independently deployed, scaled, and resourced, allowing for efficient resource management and enhanced reliability. The connection manager focuses solely on managing connections and load, while a separate Rust-based service, which is specifically designed for high CPU or memory performance tasks, handles results transformation. This service's role and benefits are detailed in our post about making Pivot Tables from a billion rows of data.

By utilizing Arrow batch, we eliminate the need for data unpacking and repacking within the connection manager. The connection manager now delegates result serialization to another service built specifically for this task. This optimization allows the connection manager to focus on its primary role, thereby improving its reliability and efficiency.

Simplifying data travel with Arrow batches

Previously, the connection manager fetches data from the driver by receiving a handle to a row-based format driver.Rows. It iterates over this driver.Rows and re-construct arrow records, although internally the driver receives data in arrow format.

Our solution was to eliminate this detour of copy/conversion and directly fetch Arrow records, passing them to other services for transformation. 

The driver offers a flag to control whether or not to fetch Arrow batches. 

Diagram of a Data Flow from Snowflake to Sigma
Data Flow Diagram (Qing Feng) 
  1. Download raw chunks in parallel from CDW. 
  2. Decode raw chunks into go values or arrow batches based whether arrow batch is enabled.  The decoding process varies slightly depending on the configuration of various flags, which we will explore in detail. E.g gosnowflake driver offers those options:some text
    1. WithArrowBatchesTimestampOption: specify different timestamp options
    2. WithHigherPrecision: Keep BigDecimal or decode them into floats/ints. 
    3. WithArrowBatchesUtf8Validation: Extra check for malformed data 
  3. Transform: Previously Arrow chunks were built from rows of values; with arrow batches this step is no longer needed.  

Switching to Arrow batches

Switching to Arrow batches involves critical considerations beyond merely enabling the feature. Addressing these questions ensures a smooth and confident transition.

Ensuring data correctness

The Arrow batches output by the driver differ from the Arrow data we previously constructed from driver.Rows. Previously, we converted data to Go types and rebuilt it; now, we retain data in Arrow format. Key considerations in transitioning to Arrow batches include: 

  • Differences in decoding logic for timestamps and numbers before and after the switch.
  • Optimal driver flag configuration.
  • Whether decoding should occur in the driver or be deferred to the upstream service, and how the upstream service should accommodate this change.
  • Potential overflow limitations.

Managing Arrow chunk size

With the driver now controlling Arrow chunk sizes, we need to determine how sizes are managed, understand their distribution, and assess whether we need to implement a control mechanism.

Comprehensive testing

Data integrity is our top priority in the Arrow batches transition. We validate correctness and consistency using our existing unit and end-to-end tests, ensuring accurate, regression-free results across all data types.

Challenges tackled along the way

While different drivers may have slight variations in their Arrow batch settings, our journey began with using Arrow batches for Snowflake. Here’s some challenges that we tackled along the way.

Challenge 1: Timestamp

Year 2262 Problem

Arrow timestamp type is stored as 64-bit integers, which limits their range. For nanosecond precision, this means timestamps cannot represent dates beyond the year 2262 or before 1677, known as the Year 2262 problem.

How Snowflake solves year 2262 problem

Snowflake uses a custom arrow.Struct type with components like epoch and fractional seconds to represent timestamps beyond the Year 2262, avoiding overflow issues with the standard arrow.Timestamp. The go-snowflake driver offers a flag (UseOriginalTimestamp) to either retain this custom format or decode it into a standard arrow.Timestamp with nanosecond precision, which may face the Year 2262 problem.

Problem 

Retaining Snowflake’s custom arrow.Struct timestamp requires upstream services to manage custom logic, which may be written in different languages and needs to be updated with driver changes, making maintenance challenging. Using native arrow.Timestamp with nanosecond precision may cause Year 2262 issues, often because distant future dates are used for efficiency instead of null values.

Solution

Sigma only supports timestamps up to microsecond precision. This allows us to use the native arrow.Timestamp without worrying about the Year 2262 problem. We made this change to add support for decoding arrow.Struct into arrow.Timestamp with microsecond or millisecond precision. 

A more optimized approach is to signal the server to send arrow.Timestamp with the required precision directly, avoiding the need to repack Snowflake’s arrow.Struct within the driver.

Challenge 2: Higher precision

Problem 

Snowflake server returns to driver BigDecimal (arrow.Decimal) natively. When decoding to arrow batches, it converts all non-zero scale numbers to float64, potentially resulting in loss of precision. Zero-scale numbers such as DECIMAL128 will be converted to int64, which could lead to overflow.

Solution

We added this change, to enable high precision behind a flag. When using high precision, arrow.Decimal data remains unconverted in driver at decoding, providing two main benefits: it helps avoid precision loss and defers the resource-intensive conversion operations to upstream services.

Challenge 3: Invalid UTF-8 

Problem 

Historically, Snowflake allowed users to upload invalid UTF-8 characters, leading to occasional occurrences of Arrow records containing invalid UTF-8 characters in arrow.String columns.

However by definition arrow.String column should contain UTF-8 characters only (here). This situation could potentially cause issues for other upstream services that consume the flawed Arrow record. For instance, the Rust Arrow library enforces some basic validation, and this flawed Arrow record would fail the validation process.

Solution

We added invalid UTF-8 validation behind a flag (here), so the driver can iterate through all values of string columns and replace any invalid characters encountered.

Challenge 4: Arrow batches rechunking

Problem 

Previously, when re-constructing arrow records by iterating over driver.Rows, we can control the size of arrow chunk. When using arrow batches, the connection manager outputs arrow batches received from Snowflake, which means we have no control over chunk size. 

This table shows the chunk size in MB received from Snowflake over a period of time, they mostly fall below 0.5MB. However, we have noticed occasional larger chunks, resulting in a long tail distribution. 

Table Showing Arrow Chunk Size in MB from Snowflake

Solution

To optimize performance, we break down Arrow batches into smaller chunks if their size exceeds a certain limit.

Enhanced System Performance and Stability

We've witnessed remarkable improvements in reliability and resource utilization, particularly with large query results.

For example, in a benchmarking test with a table containing 200 columns and 1 million rows, connection manager saw a significant 75% reduction in memory usage.

Example chart showing memory usage

CPU utilization has also undergone a remarkable decrease, dropping by over 90%.

Diagram of a Data Flow from Snowflake to Sigma

These enhancements bring substantial benefits for both Sigma and our customers. By reducing memory and CPU usage, we can support more variable workloads and improve overall system performance. This means customers can execute queries with larger result size without encountering issues, leading to a smoother user experience. Most excitingly, container OOM crashes in production have been nearly eliminated, ensuring greater stability and reliability for our customers’ experiences.

Example chart showing container restarts

Looking ahead

Sigma’s adoption of Apache Arrow has drastically transformed how we handle data, driving significant gains in performance, efficiency, and reliability. By eliminating unnecessary data conversions and optimizing our resource usage, we've reduced memory and CPU utilization, ensuring faster, more stable experiences for our customers.

These advancements enable Sigma to handle larger, more complex workloads while maintaining the seamless, real-time data interaction that our users rely on.

Apart from Snowflake, both BigQuery and Databricks also support arrow batches, and we're actively working towards integrating with them as well. 

Join us at Sigma Computing

At Sigma, we are dedicated to pushing the boundaries of what’s possible in data analytics and invite passionate engineers to join our team. If you want to work on cutting-edge projects and shape the future of data exploration, apply now to one of our open roles.

STATE OF BI SURVEY