class documentation
class SparkDataSet(AbstractVersionedDataSet[
SparkDataSet loads and saves Spark dataframes.
Example usage for the YAML API:
weather: type: spark.SparkDataSet filepath: s3a://your_bucket/data/01_raw/weather/* file_format: csv load_args: header: True inferSchema: True save_args: sep: '|' header: True weather_with_schema: type: spark.SparkDataSet filepath: s3a://your_bucket/data/01_raw/weather/* file_format: csv load_args: header: True schema: filepath: path/to/schema.json save_args: sep: '|' header: True weather_cleaned: type: spark.SparkDataSet filepath: data/02_intermediate/data.parquet file_format: parquet
Example usage for the Python API:
>>> from pyspark.sql import SparkSession >>> from pyspark.sql.types import (StructField, StringType, >>> IntegerType, StructType) >>> >>> from kedro.extras.datasets.spark import SparkDataSet >>> >>> schema = StructType([StructField("name", StringType(), True), >>> StructField("age", IntegerType(), True)]) >>> >>> data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)] >>> >>> spark_df = SparkSession.builder.getOrCreate() >>> .createDataFrame(data, schema) >>> >>> data_set = SparkDataSet(filepath="test_data") >>> data_set.save(spark_df) >>> reloaded = data_set.load() >>> >>> reloaded.take(4)
Method | __init__ |
Creates a new instance of SparkDataSet. |
Constant | DEFAULT |
Undocumented |
Constant | DEFAULT |
Undocumented |
Static Method | _get |
Undocumented |
Static Method | _load |
Undocumented |
Method | _describe |
Undocumented |
Method | _exists |
Undocumented |
Method | _handle |
Undocumented |
Method | _load |
Undocumented |
Method | _save |
Undocumented |
Constant | _SINGLE |
Undocumented |
Instance Variable | _file |
Undocumented |
Instance Variable | _fs |
Undocumented |
Instance Variable | _load |
Undocumented |
Instance Variable | _save |
Undocumented |
Instance Variable | _schema |
Undocumented |
Inherited from AbstractVersionedDataSet
:
Method | exists |
Checks whether a data set's output already exists by calling the provided _exists() method. |
Method | load |
Loads data by delegation to the provided load method. |
Method | resolve |
Compute the version the dataset should be loaded with. |
Method | resolve |
Compute the version the dataset should be saved with. |
Method | save |
Saves data by delegation to the provided save method. |
Method | _fetch |
Undocumented |
Method | _fetch |
Generate and cache the current save version |
Method | _get |
Undocumented |
Method | _get |
Undocumented |
Method | _get |
Undocumented |
Method | _release |
Undocumented |
Instance Variable | _exists |
Undocumented |
Instance Variable | _filepath |
Undocumented |
Instance Variable | _glob |
Undocumented |
Instance Variable | _version |
Undocumented |
Instance Variable | _version |
Undocumented |
Inherited from AbstractDataSet
(via AbstractVersionedDataSet
):
Class Method | from |
Create a data set instance using the configuration provided. |
Method | __str__ |
Undocumented |
Method | release |
Release any cached data. |
Method | _copy |
Undocumented |
Property | _logger |
Undocumented |
def __init__(self, filepath:
str
, file_format: str
= 'parquet', load_args: Dict[ str, Any]
= None, save_args: Dict[ str, Any]
= None, version: Version
= None, credentials: Dict[ str, Any]
= None):
(source)
¶
Creates a new instance of SparkDataSet.
Parameters | |
filepath:str | Filepath in POSIX format to a Spark dataframe. When using Databricks and working with data written to mount path points, specify filepath``s for (versioned) ``SparkDataSet``s starting with ``/dbfs/mnt. |
filestr | File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv, delta. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html |
loadDict[ | Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html |
saveDict[ | Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html |
version:Version | If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated. |
credentials:Dict[ | Credentials to access the S3 bucket, such as key, secret, if filepath prefix is s3a:// or s3n://. Optional keyword arguments passed to hdfs.client.InsecureClient if filepath prefix is hdfs://. Ignored otherwise. |