class documentation

class ParquetDataSet(AbstractDataSet[dd.DataFrame, dd.DataFrame]): (source)

View In Hierarchy

ParquetDataSet loads and saves data to parquet file(s). It uses Dask remote data services to handle the corresponding load and save operations: https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html

Example usage for the YAML API:

cars:
  type: dask.ParquetDataSet
  filepath: s3://bucket_name/path/to/folder
  save_args:
    compression: GZIP
  credentials:
    client_kwargs:
      aws_access_key_id: YOUR_KEY
      aws_secret_access_key: YOUR_SECRET

Example usage for the Python API:

>>> from kedro.extras.datasets.dask import ParquetDataSet
>>> import pandas as pd
>>> import dask.dataframe as dd
>>>
>>> data = pd.DataFrame({'col1': [1, 2], 'col2': [4, 5],
>>>                      'col3': [[5, 6], [7, 8]]})
>>> ddf = dd.from_pandas(data, npartitions=2)
>>>
>>> data_set = ParquetDataSet(
>>>     filepath="s3://bucket_name/path/to/folder",
>>>     credentials={
>>>         'client_kwargs':{
>>>             'aws_access_key_id': 'YOUR_KEY',
>>>             'aws_secret_access_key': 'YOUR SECRET',
>>>         }
>>>     },
>>>     save_args={"compression": "GZIP"}
>>> )
>>> data_set.save(ddf)
>>> reloaded = data_set.load()
>>>
>>> assert ddf.compute().equals(reloaded.compute())

The output schema can also be explicitly specified using Triad. This is processed to map specific columns to PyArrow field types or schema. For instance:

parquet_dataset:
  type: dask.ParquetDataSet
  filepath: "s3://bucket_name/path/to/folder"
  credentials:
    client_kwargs:
      aws_access_key_id: YOUR_KEY
      aws_secret_access_key: "YOUR SECRET"
  save_args:
    compression: GZIP
    schema:
      col1: [int32]
      col2: [int32]
      col3: [[int32]]
Method __init__ Creates a new instance of ParquetDataSet pointing to concrete parquet files.
Constant DEFAULT_LOAD_ARGS Undocumented
Constant DEFAULT_SAVE_ARGS Undocumented
Property fs_args Property of optional file system parameters.
Method _describe Undocumented
Method _exists Undocumented
Method _load Undocumented
Method _process_schema This method processes the schema in the catalog.yml or the API, if provided. This assumes that the schema is specified using Triad's grammar for schema definition.
Method _save Undocumented
Instance Variable _credentials Undocumented
Instance Variable _filepath Undocumented
Instance Variable _fs_args Undocumented
Instance Variable _load_args Undocumented
Instance Variable _save_args Undocumented

Inherited from AbstractDataSet:

Class Method from_config Create a data set instance using the configuration provided.
Method __str__ Undocumented
Method exists Checks whether a data set's output already exists by calling the provided _exists() method.
Method load Loads data by delegation to the provided load method.
Method release Release any cached data.
Method save Saves data by delegation to the provided save method.
Method _copy Undocumented
Method _release Undocumented
Property _logger Undocumented
def __init__(self, filepath: str, load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None, credentials: Dict[str, Any] = None, fs_args: Dict[str, Any] = None): (source)

Creates a new instance of ParquetDataSet pointing to concrete parquet files.

Parameters
filepath:strFilepath in POSIX format to a parquet file parquet collection or the directory of a multipart parquet.
load_args:Dict[str, Any]Additional loading options dask.dataframe.read_parquet: https://docs.dask.org/en/latest/generated/dask.dataframe.read_parquet.html
save_args:Dict[str, Any]Additional saving options for dask.dataframe.to_parquet: https://docs.dask.org/en/latest/generated/dask.dataframe.to_parquet.html
credentials:Dict[str, Any]Credentials required to get access to the underlying filesystem. E.g. for GCSFileSystem it should look like {"token": None}.
fs_args:Dict[str, Any]Optional parameters to the backend file system driver: https://docs.dask.org/en/latest/how-to/connect-to-remote-data.html#optional-parameters
DEFAULT_LOAD_ARGS: Dict[str, Any] = (source)

Undocumented

Value
{}
DEFAULT_SAVE_ARGS: Dict[str, Any] = (source)

Undocumented

Value
{'write_index': False}
@property
fs_args: Dict[str, Any] = (source)

Property of optional file system parameters.

Returns
A dictionary of backend file system parameters, including credentials.
def _describe(self) -> Dict[str, Any]: (source)

Undocumented

def _exists(self) -> bool: (source)

Undocumented

def _load(self) -> dd.DataFrame: (source)

Undocumented

def _process_schema(self): (source)

This method processes the schema in the catalog.yml or the API, if provided. This assumes that the schema is specified using Triad's grammar for schema definition.

When the value of the schema variable is a string, it is assumed that it corresponds to the full schema specification for the data.

Alternatively, if the schema is specified as a dictionary, then only the columns that are specified will be strictly mapped to a field type. The other unspecified columns, if present, will be inferred from the data.

This method converts the Triad-parsed schema into a pyarrow schema. The output directly supports Dask's specifications for providing a schema when saving to a parquet file.

Note that if a pa.Schema object is passed directly in the schema argument, no processing will be done. Additionally, the behavior when passing a pa.Schema object is assumed to be consistent with how Dask sees it. That is, it should fully define the schema for all fields.

def _save(self, data: dd.DataFrame): (source)

Undocumented

_credentials = (source)

Undocumented

_filepath = (source)

Undocumented

_fs_args = (source)

Undocumented

_load_args = (source)

Undocumented

_save_args = (source)

Undocumented