Skip to content

Commit

Permalink
Add support for writing a data frame as Parquet format (#46)
Browse files Browse the repository at this point in the history
fix #36

Note that write options are incomplete. Currently the max row group size
is only available.
  • Loading branch information
kou committed Apr 3, 2023
1 parent b433005 commit a308ca3
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 16 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Sutou Kouhei <kou@clear-code.com>
# Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,6 +111,7 @@ jobs:
sudo apt install -y -V \
libarrow-glib-dev \
libgirepository1.0-dev \
libparquet-glib-dev \
ninja-build \
valac
pip install meson
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ arrow = { version = "34", features = ["ffi", "prettyprint"] }
arrow-data = "34"
datafusion = "21"
libc = "0.2"
parquet = { version = "34", features = ["arrow", "async"] }
tokio = "1"

[package.metadata.capi.header]
Expand Down
4 changes: 3 additions & 1 deletion Gemfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- ruby -*-
#
# Copyright 2022 Sutou Kouhei <kou@clear-code.com>
# Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,5 +16,7 @@

source "https://rubygems.org/"

gem "rake"
gem "red-arrow"
gem "red-parquet"
gem "test-unit"
6 changes: 5 additions & 1 deletion datafusion-glib/data-frame-raw.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Sutou Kouhei <kou@clear-code.com>
* Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,4 +26,8 @@ GDF_AVAILABLE_IN_10_0
GDFDataFrame *
gdf_data_frame_new_raw(DFDataFrame *raw_data_frame);

GDF_AVAILABLE_IN_21_0
DFParquetWriterProperties *
gdf_parquet_writer_properties_get_raw(GDFParquetWriterProperties *properties);

G_END_DECLS
122 changes: 121 additions & 1 deletion datafusion-glib/data-frame.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Sutou Kouhei <kou@clear-code.com>
* Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,9 +23,83 @@ G_BEGIN_DECLS
* SECTION: data-frame
* @include: datafusion-glib/datafusion-glib.h
*
* #GDFParquertWriterProperties is a class to customize how to write
* an Apache Parquet file.
*
* #GDFDataFrame is a data frame class.
*/

typedef struct GDFParquetWriterPropertiesPrivate_ {
DFParquetWriterProperties *properties;
} GDFParquetWriterPropertiesPrivate;

enum {
PROP_PARQUET_WRITER_PROPERTIES = 1,
};

G_DEFINE_TYPE_WITH_PRIVATE(GDFParquetWriterProperties,
gdf_parquet_writer_properties,
G_TYPE_OBJECT)

static void
gdf_parquet_writer_properties_finalize(GObject *object)
{
GDFParquetWriterPropertiesPrivate *priv =
gdf_parquet_writer_properties_get_instance_private(
GDF_PARQUET_WRITER_PROPERTIES(object));
df_parquet_writer_properties_free(priv->properties);
G_OBJECT_CLASS(gdf_parquet_writer_properties_parent_class)->finalize(object);
}

static void
gdf_parquet_writer_properties_init(GDFParquetWriterProperties *object)
{
GDFParquetWriterPropertiesPrivate *priv =
gdf_parquet_writer_properties_get_instance_private(
GDF_PARQUET_WRITER_PROPERTIES(object));
priv->properties = df_parquet_writer_properties_new();
}

static void
gdf_parquet_writer_properties_class_init(GDFParquetWriterPropertiesClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS(klass);
gobject_class->finalize = gdf_parquet_writer_properties_finalize;
}

/**
* gdf_parquet_writer_properties_new:
*
* Returns: A new Apache Parquet writer properties.
*
* Since: 21.0.0
*/
GDFParquetWriterProperties *
gdf_parquet_writer_properties_new(void)
{
return g_object_new(GDF_TYPE_PARQUET_WRITER_PROPERTIES, NULL);
}

/**
* gdf_parquet_writer_properties_set_max_row_group_size:
* @properties: A #GDFParquetWriterProperties.
* @size: The maximum number of rows in a row group.
*
* Set the maximum number of rows in a row group.
*
* Since: 21.0.0
*/
void
gdf_parquet_writer_properties_set_max_row_group_size(
GDFParquetWriterProperties *properties,
guint64 size)
{
GDFParquetWriterPropertiesPrivate *priv =
gdf_parquet_writer_properties_get_instance_private(properties);
df_parquet_writer_properties_set_max_row_group_size(priv->properties, size);
}


typedef struct GDFDataFramePrivate_ {
DFDataFrame *data_frame;
} GDFDataFramePrivate;
Expand Down Expand Up @@ -162,10 +236,56 @@ gdf_data_frame_to_table(GDFDataFrame *data_frame, GError **error)
return table;
}

/**
* gdf_data_frame_write_parquet:
* @data_frame: A #GDFDataFrame.
* @path: An output path.
* @properties: (nullable): Properties how to write Apache Parquet files.
* @error: (nullable): Return location for a #GError or %NULL.
*
* Returns: %TRUE on success, %FALSE otherwise.
*
* Since: 21.0.0
*/
gboolean
gdf_data_frame_write_parquet(GDFDataFrame *data_frame,
const gchar *path,
GDFParquetWriterProperties *properties,
GError **error)
{
GDFDataFramePrivate *priv = gdf_data_frame_get_instance_private(data_frame);
DFParquetWriterProperties *df_properties = NULL;
if (properties) {
df_properties = gdf_parquet_writer_properties_get_raw(properties);
}
DFError *df_error = NULL;
gboolean success = df_data_frame_write_parquet(priv->data_frame,
path,
df_properties,
&df_error);
if (!success) {
g_set_error(error,
GDF_ERROR,
df_error_get_code(df_error),
"[data-frame][write-parquet] %s",
df_error_get_message(df_error));
df_error_free(df_error);
}
return success;
}

GDFDataFrame *
gdf_data_frame_new_raw(DFDataFrame *data_frame)
{
return g_object_new(GDF_TYPE_DATA_FRAME,
"data-frame", data_frame,
NULL);
}

DFParquetWriterProperties *
gdf_parquet_writer_properties_get_raw(GDFParquetWriterProperties *properties)
{
GDFParquetWriterPropertiesPrivate *priv =
gdf_parquet_writer_properties_get_instance_private(properties);
return priv->properties;
}
29 changes: 28 additions & 1 deletion datafusion-glib/data-frame.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022 Sutou Kouhei <kou@clear-code.com>
* Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,27 @@

G_BEGIN_DECLS

#define GDF_TYPE_PARQUET_WRITER_PROPERTIES \
(gdf_parquet_writer_properties_get_type())
G_DECLARE_DERIVABLE_TYPE(GDFParquetWriterProperties,
gdf_parquet_writer_properties,
GDF,
PARQUET_WRITER_PROPERTIES,
GObject)
struct _GDFParquetWriterPropertiesClass
{
GObjectClass parent_class;
};

GDF_AVAILABLE_IN_21_0
GDFParquetWriterProperties *
gdf_parquet_writer_properties_new(void);
GDF_AVAILABLE_IN_21_0
void
gdf_parquet_writer_properties_set_max_row_group_size(
GDFParquetWriterProperties *properties,
guint64 size);

#define GDF_TYPE_DATA_FRAME (gdf_data_frame_get_type())
G_DECLARE_DERIVABLE_TYPE(GDFDataFrame,
gdf_data_frame,
Expand All @@ -39,6 +60,12 @@ gdf_data_frame_show(GDFDataFrame *data_frame, GError **error);
GDF_AVAILABLE_IN_10_0
GArrowTable *
gdf_data_frame_to_table(GDFDataFrame *data_frame, GError **error);
GDF_AVAILABLE_IN_21_0
gboolean
gdf_data_frame_write_parquet(GDFDataFrame *data_frame,
const gchar *path,
GDFParquetWriterProperties *properties,
GError **error);


G_END_DECLS
2 changes: 1 addition & 1 deletion package/apt/debian-bookworm/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,5 @@ RUN \
) > /usr/bin/${exe##*/} && \
chmod +x /usr/bin/${exe##*/}; \
done && \
MAKEFLAGS="-j$(nproc)" gem install red-arrow && \
MAKEFLAGS="-j$(nproc)" gem install red-parquet && \
apt clean
4 changes: 2 additions & 2 deletions package/apt/debian-bullseye/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Sutou Kouhei <kou@clear-code.com>
# Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,6 +73,6 @@ RUN \
done && \
pip3 install --upgrade meson && \
ln -s /usr/local/bin/meson /usr/bin/ && \
MAKEFLAGS="-j$(nproc)" gem install red-arrow && \
MAKEFLAGS="-j$(nproc)" gem install red-parquet && \
apt clean && \
rm -rf /var/lib/apt/lists/*
4 changes: 2 additions & 2 deletions package/apt/ubuntu-focal/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Sutou Kouhei <kou@clear-code.com>
# Copyright 2022-2023 Sutou Kouhei <kou@clear-code.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,6 @@ RUN \
done && \
pip3 install --upgrade meson && \
ln -s /usr/local/bin/meson /usr/bin/ && \
MAKEFLAGS="-j$(nproc)" gem install red-arrow && \
MAKEFLAGS="-j$(nproc)" gem install red-parquet && \
apt clean && \
rm -rf /var/lib/apt/lists/*
2 changes: 1 addition & 1 deletion package/apt/ubuntu-jammy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ RUN \
) > /usr/bin/${exe##*/} && \
chmod +x /usr/bin/${exe##*/}; \
done && \
MAKEFLAGS="-j$(nproc)" gem install red-arrow && \
MAKEFLAGS="-j$(nproc)" gem install red-parquet && \
apt clean && \
rm -rf /var/lib/apt/lists/*
77 changes: 77 additions & 0 deletions src/capi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionContext;
use datafusion::execution::options::CsvReadOptions;
use datafusion::execution::options::ParquetReadOptions;
use parquet::file::properties::WriterProperties;

fn strdup(rs_str: &str) -> *mut libc::c_char {
unsafe {
Expand Down Expand Up @@ -370,6 +371,50 @@ pub struct DFArrowArray {
private_data: *mut libc::c_void,
}

/// \struct DFParquertWriterProperties
/// \brief A struct to customize how to write an Apache Parquet file.
///
/// You need to free this by `df_parquet_writer_properties_free()`
/// when no longer needed.
pub struct DFParquetWriterProperties {
max_row_group_size: Option<usize>,
}

impl DFParquetWriterProperties {
pub fn new() -> Self {
Self {
max_row_group_size: None,
}
}

pub fn build(&self) -> WriterProperties {
let mut builder = WriterProperties::builder();
if let Some(size) = self.max_row_group_size {
builder = builder.set_max_row_group_size(size);
}
builder.build()
}
}

#[no_mangle]
pub extern "C" fn df_parquet_writer_properties_new() -> Box<DFParquetWriterProperties> {
Box::new(DFParquetWriterProperties::new())
}

#[no_mangle]
pub extern "C" fn df_parquet_writer_properties_free(
_properties: Option<Box<DFParquetWriterProperties>>,
) {
}

#[no_mangle]
pub extern "C" fn df_parquet_writer_properties_set_max_row_group_size(
properties: &mut DFParquetWriterProperties,
size: usize,
) {
properties.max_row_group_size = Some(size);
}

fn block_on<F: Future>(future: F) -> F::Output {
tokio::runtime::Runtime::new().unwrap().block_on(future)
}
Expand Down Expand Up @@ -415,6 +460,38 @@ pub extern "C" fn df_data_frame_show(
block_on(future).into_df_error(error, None);
}

/// \brief Write the given data frame contents as Apache Parquet format.
///
/// \param data_frame A `DFDataFrame` to be written.
/// \param path An output path.
/// \param writer_properties Properties how to write Apache Parquet files.
/// \param error Return location for a `DFError` or `NULL`.
#[no_mangle]
pub extern "C" fn df_data_frame_write_parquet(
data_frame: &mut DFDataFrame,
path: *const libc::c_char,
writer_properties: Option<&DFParquetWriterProperties>,
error: *mut *mut DFError,
) -> bool {
let maybe_success = || -> Option<bool> {
let cstr_path = unsafe { CStr::from_ptr(path) };
let maybe_rs_path = cstr_path.to_str().into_df_error(error, None);
let rs_path = match maybe_rs_path {
Some(rs_path) => rs_path,
None => return None,
};
let maybe_rs_writer_properties =
writer_properties.map(|properties| properties.build());
let future = data_frame
.data_frame
.clone()
.write_parquet(rs_path, maybe_rs_writer_properties);
block_on(future).into_df_error(error, None)?;
Some(true)
}();
maybe_success.unwrap_or(false)
}

#[no_mangle]
pub extern "C" fn df_data_frame_export(
data_frame: &mut DFDataFrame,
Expand Down
Loading

0 comments on commit a308ca3

Please sign in to comment.