Optimizing Data Retrieval: A Deep Dive Into Sigma's Use Of Arrow Batches
Table of Contents
At Sigma, we've integrated Apache Arrow into our architecture to improve how data moves across services, cutting out inefficiencies and improving performance. Arrow's zero-copy, in-memory columnar format allows us to handle large datasets with speed and precision, especially when working with cloud data warehouses like Snowflake and Databricks.
In this blog, we’ll dive into the technical aspects of how we integrated Arrow batches directly from the data warehouse, we solved along the way, and how these changes enhance the reliability of our platform.
Optimizing data flow with Apache Arrow in Sigma
What's Apache Arrow?
Apache Arrow is a high-performance, multi-language columnar format that enables zero-copy read operations. By leveraging in-memory data representation, it ensures fast reads and writes. At Sigma, we use Apache Arrow to move data across stacks.
Sigma’s connection manager
Sigma’s architecture includes a connection manager service that stands at the frontier to manage connections, load, and retrieve data from warehouses. Traditionally, this process involved unpacking and repacking data before passing it to another service for transformation. However, some drivers (e.g., Snowflake, Databricks) provide a way to fetch Arrow batches directly rather than row-based data. This makes it possible to eliminate the redundant unpacking and repacking steps in the connection manager, allowing data to travel directly to the final destination and bypassing unnecessary detours.
Benefit of Arrow batches
At Sigma, each service is specialized for specific features within our query lifecycle. These services are independently deployed, scaled, and resourced, allowing for efficient resource management and enhanced reliability. The connection manager focuses solely on managing connections and load, while a separate Rust-based service, which is specifically designed for high CPU or memory performance tasks, handles results transformation. This service's role and benefits are detailed in our post about making Pivot Tables from a billion rows of data.
By utilizing Arrow batch, we eliminate the need for data unpacking and repacking within the connection manager. The connection manager now delegates result serialization to another service built specifically for this task. This optimization allows the connection manager to focus on its primary role, thereby improving its reliability and efficiency.
Simplifying data travel with Arrow batches
Previously, the connection manager fetches data from the driver by receiving a handle to a row-based format driver.Rows. It iterates over this driver.Rows and re-construct arrow records, although internally the driver receives data in arrow format.
Our solution was to eliminate this detour of copy/conversion and directly fetch Arrow records, passing them to other services for transformation.
The driver offers a flag to control whether or not to fetch Arrow batches.
- Download raw chunks in parallel from CDW.
- Decode raw chunks into go values or arrow batches based whether arrow batch is enabled. The decoding process varies slightly depending on the configuration of various flags, which we will explore in detail. E.g gosnowflake driver offers those options:some text
- WithArrowBatchesTimestampOption: specify different timestamp options
- WithHigherPrecision: Keep BigDecimal or decode them into floats/ints.
- WithArrowBatchesUtf8Validation: Extra check for malformed data
- Transform: Previously Arrow chunks were built from rows of values; with arrow batches this step is no longer needed.
Switching to Arrow batches
Switching to Arrow batches involves critical considerations beyond merely enabling the feature. Addressing these questions ensures a smooth and confident transition.
Ensuring data correctness
The Arrow batches output by the driver differ from the Arrow data we previously constructed from driver.Rows. Previously, we converted data to Go types and rebuilt it; now, we retain data in Arrow format. Key considerations in transitioning to Arrow batches include:
- Differences in decoding logic for timestamps and numbers before and after the switch.
- Optimal driver flag configuration.
- Whether decoding should occur in the driver or be deferred to the upstream service, and how the upstream service should accommodate this change.
- Potential overflow limitations.
Managing Arrow chunk size
With the driver now controlling Arrow chunk sizes, we need to determine how sizes are managed, understand their distribution, and assess whether we need to implement a control mechanism.
Comprehensive testing
Data integrity is our top priority in the Arrow batches transition. We validate correctness and consistency using our existing unit and end-to-end tests, ensuring accurate, regression-free results across all data types.
Challenges tackled along the way
While different drivers may have slight variations in their Arrow batch settings, our journey began with using Arrow batches for Snowflake. Here’s some challenges that we tackled along the way.
Challenge 1: Timestamp
Year 2262 Problem
Arrow timestamp type is stored as 64-bit integers, which limits their range. For nanosecond precision, this means timestamps cannot represent dates beyond the year 2262 or before 1677, known as the Year 2262 problem.
How Snowflake solves year 2262 problem
Snowflake uses a custom arrow.Struct type with components like epoch and fractional seconds to represent timestamps beyond the Year 2262, avoiding overflow issues with the standard arrow.Timestamp. The go-snowflake driver offers a flag (UseOriginalTimestamp) to either retain this custom format or decode it into a standard arrow.Timestamp with nanosecond precision, which may face the Year 2262 problem.
Problem
Retaining Snowflake’s custom arrow.Struct timestamp requires upstream services to manage custom logic, which may be written in different languages and needs to be updated with driver changes, making maintenance challenging. Using native arrow.Timestamp with nanosecond precision may cause Year 2262 issues, often because distant future dates are used for efficiency instead of null values.
Solution
Sigma only supports timestamps up to microsecond precision. This allows us to use the native arrow.Timestamp without worrying about the Year 2262 problem. We made this change to add support for decoding arrow.Struct into arrow.Timestamp with microsecond or millisecond precision.
A more optimized approach is to signal the server to send arrow.Timestamp with the required precision directly, avoiding the need to repack Snowflake’s arrow.Struct within the driver.
Challenge 2: Higher precision
Problem
Snowflake server returns to driver BigDecimal (arrow.Decimal) natively. When decoding to arrow batches, it converts all non-zero scale numbers to float64, potentially resulting in loss of precision. Zero-scale numbers such as DECIMAL128 will be converted to int64, which could lead to overflow.
Solution
We added this change, to enable high precision behind a flag. When using high precision, arrow.Decimal data remains unconverted in driver at decoding, providing two main benefits: it helps avoid precision loss and defers the resource-intensive conversion operations to upstream services.
Challenge 3: Invalid UTF-8
Problem
Historically, Snowflake allowed users to upload invalid UTF-8 characters, leading to occasional occurrences of Arrow records containing invalid UTF-8 characters in arrow.String columns.
However by definition arrow.String column should contain UTF-8 characters only (here). This situation could potentially cause issues for other upstream services that consume the flawed Arrow record. For instance, the Rust Arrow library enforces some basic validation, and this flawed Arrow record would fail the validation process.
Solution
We added invalid UTF-8 validation behind a flag (here), so the driver can iterate through all values of string columns and replace any invalid characters encountered.
Challenge 4: Arrow batches rechunking
Problem
Previously, when re-constructing arrow records by iterating over driver.Rows, we can control the size of arrow chunk. When using arrow batches, the connection manager outputs arrow batches received from Snowflake, which means we have no control over chunk size.
This table shows the chunk size in MB received from Snowflake over a period of time, they mostly fall below 0.5MB. However, we have noticed occasional larger chunks, resulting in a long tail distribution.
Solution
To optimize performance, we break down Arrow batches into smaller chunks if their size exceeds a certain limit.
Enhanced System Performance and Stability
We've witnessed remarkable improvements in reliability and resource utilization, particularly with large query results.
For example, in a benchmarking test with a table containing 200 columns and 1 million rows, connection manager saw a significant 75% reduction in memory usage.
CPU utilization has also undergone a remarkable decrease, dropping by over 90%.
These enhancements bring substantial benefits for both Sigma and our customers. By reducing memory and CPU usage, we can support more variable workloads and improve overall system performance. This means customers can execute queries with larger result size without encountering issues, leading to a smoother user experience. Most excitingly, container OOM crashes in production have been nearly eliminated, ensuring greater stability and reliability for our customers’ experiences.
Looking ahead
Sigma’s adoption of Apache Arrow has drastically transformed how we handle data, driving significant gains in performance, efficiency, and reliability. By eliminating unnecessary data conversions and optimizing our resource usage, we've reduced memory and CPU utilization, ensuring faster, more stable experiences for our customers.
These advancements enable Sigma to handle larger, more complex workloads while maintaining the seamless, real-time data interaction that our users rely on.
Apart from Snowflake, both BigQuery and Databricks also support arrow batches, and we're actively working towards integrating with them as well.
Join us at Sigma Computing
At Sigma, we are dedicated to pushing the boundaries of what’s possible in data analytics and invite passionate engineers to join our team. If you want to work on cutting-edge projects and shape the future of data exploration, apply now to one of our open roles.