class documentation

class SparkDataSet(AbstractVersionedDataSet[DataFrame, DataFrame]): (source)

View In Hierarchy

SparkDataSet loads and saves Spark dataframes.

Example usage for the YAML API:

weather:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

weather_with_schema:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather/*
  file_format: csv
  load_args:
    header: True
    schema:
      filepath: path/to/schema.json
  save_args:
    sep: '|'
    header: True

weather_cleaned:
  type: spark.SparkDataSet
  filepath: data/02_intermediate/data.parquet
  file_format: parquet

Example usage for the Python API:

>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.types import (StructField, StringType,
>>>                                IntegerType, StructType)
>>>
>>> from kedro.extras.datasets.spark import SparkDataSet
>>>
>>> schema = StructType([StructField("name", StringType(), True),
>>>                      StructField("age", IntegerType(), True)])
>>>
>>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]
>>>
>>> spark_df = SparkSession.builder.getOrCreate()        >>>                        .createDataFrame(data, schema)
>>>
>>> data_set = SparkDataSet(filepath="test_data")
>>> data_set.save(spark_df)
>>> reloaded = data_set.load()
>>>
>>> reloaded.take(4)
Method __init__ Creates a new instance of SparkDataSet.
Constant DEFAULT_LOAD_ARGS Undocumented
Constant DEFAULT_SAVE_ARGS Undocumented
Static Method _get_spark Undocumented
Static Method _load_schema_from_file Undocumented
Method _describe Undocumented
Method _exists Undocumented
Method _handle_delta_format Undocumented
Method _load Undocumented
Method _save Undocumented
Constant _SINGLE_PROCESS Undocumented
Instance Variable _file_format Undocumented
Instance Variable _fs_prefix Undocumented
Instance Variable _load_args Undocumented
Instance Variable _save_args Undocumented
Instance Variable _schema Undocumented

Inherited from AbstractVersionedDataSet:

Method exists Checks whether a data set's output already exists by calling the provided _exists() method.
Method load Loads data by delegation to the provided load method.
Method resolve_load_version Compute the version the dataset should be loaded with.
Method resolve_save_version Compute the version the dataset should be saved with.
Method save Saves data by delegation to the provided save method.
Method _fetch_latest_load_version Undocumented
Method _fetch_latest_save_version Generate and cache the current save version
Method _get_load_path Undocumented
Method _get_save_path Undocumented
Method _get_versioned_path Undocumented
Method _release Undocumented
Instance Variable _exists_function Undocumented
Instance Variable _filepath Undocumented
Instance Variable _glob_function Undocumented
Instance Variable _version Undocumented
Instance Variable _version_cache Undocumented

Inherited from AbstractDataSet (via AbstractVersionedDataSet):

Class Method from_config Create a data set instance using the configuration provided.
Method __str__ Undocumented
Method release Release any cached data.
Method _copy Undocumented
Property _logger Undocumented
def __init__(self, filepath: str, file_format: str = 'parquet', load_args: Dict[str, Any] = None, save_args: Dict[str, Any] = None, version: Version = None, credentials: Dict[str, Any] = None): (source)

Creates a new instance of SparkDataSet.

Parameters
filepath:strFilepath in POSIX format to a Spark dataframe. When using Databricks and working with data written to mount path points, specify filepath``s for (versioned) ``SparkDataSet``s starting with ``/dbfs/mnt.
file_format:strFile format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv, delta. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html
load_args:Dict[str, Any]Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
save_args:Dict[str, Any]Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html
version:VersionIf specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.
credentials:Dict[str, Any]Credentials to access the S3 bucket, such as key, secret, if filepath prefix is s3a:// or s3n://. Optional keyword arguments passed to hdfs.client.InsecureClient if filepath prefix is hdfs://. Ignored otherwise.
DEFAULT_LOAD_ARGS: Dict[str, Any] = (source)

Undocumented

Value
{}
DEFAULT_SAVE_ARGS: Dict[str, Any] = (source)

Undocumented

Value
{}
@staticmethod
def _get_spark(): (source)

Undocumented

@staticmethod
def _load_schema_from_file(schema: Dict[str, Any]) -> StructType: (source)

Undocumented

def _describe(self) -> Dict[str, Any]: (source)

Undocumented

def _exists(self) -> bool: (source)

Undocumented

def _handle_delta_format(self): (source)

Undocumented

def _load(self) -> DataFrame: (source)

Undocumented

def _save(self, data: DataFrame): (source)

Undocumented

_SINGLE_PROCESS: bool = (source)

Undocumented

Value
True
_file_format = (source)

Undocumented

_fs_prefix = (source)

Undocumented

_load_args = (source)

Undocumented

_save_args = (source)

Undocumented

Undocumented