class documentation
class PartitionedDataSet(AbstractDataSet): (source)
Known subclasses: kedro.io.IncrementalDataSet
PartitionedDataSet loads and saves partitioned file-like data using the
underlying dataset definition. For filesystem level operations it uses fsspec
:
https://github.com/intake/filesystem_spec.
It also supports advanced features like lazy saving.
Example usage for the YAML API:
station_data: type: PartitionedDataSet path: data/03_primary/station_data dataset: type: pandas.CSVDataSet load_args: sep: '\t' save_args: sep: '\t' index: true filename_suffix: '.dat'
Example usage for the Python API:
>>> import pandas as pd >>> from kedro.io import PartitionedDataSet >>> >>> # Create a fake pandas dataframe with 10 rows of data >>> df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)]) >>> >>> # Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key >>> dict_df = { day_of_month: df[df["DAY_OF_MONTH"] == day_of_month] for day_of_month in df["DAY_OF_MONTH"] } >>> >>> # Save it as small paritions with DAY_OF_MONTH as the partition key >>> data_set = PartitionedDataSet( path="df_with_partition", dataset="pandas.CSVDataSet", filename_suffix=".csv" ) >>> # This will create a folder `df_with_partition` and save multiple files >>> # with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc. >>> data_set.save(dict_df) >>> >>> # This will create lazy load functions instead of loading data into memory immediately. >>> loaded = data_set.load() >>> >>> # Load all the partitions >>> for partition_id, partition_load_func in loaded.items(): # The actual function that loads the data partition_data = partition_load_func() >>> >>> # Add the processing logic for individual partition HERE >>> print(partition_data)
You can also load multiple partitions from a remote storage and combine them like this:
>>> import pandas as pd >>> from kedro.io import PartitionedDataSet >>> >>> # these credentials will be passed to both 'fsspec.filesystem()' call >>> # and the dataset initializer >>> credentials = {"key1": "secret1", "key2": "secret2"} >>> >>> data_set = PartitionedDataSet( path="s3://bucket-name/path/to/folder", dataset="pandas.CSVDataSet", credentials=credentials ) >>> loaded = data_set.load() >>> # assert isinstance(loaded, dict) >>> >>> combine_all = pd.DataFrame() >>> >>> for partition_id, partition_load_func in loaded.items(): partition_data = partition_load_func() combine_all = pd.concat( [combine_all, partition_data], ignore_index=True, sort=True ) >>> >>> new_data = pd.DataFrame({"new": [1, 2]}) >>> # creates "s3://bucket-name/path/to/folder/new/partition.csv" >>> data_set.save({"new/partition.csv": new_data})
Method | __init__ |
Creates a new instance of PartitionedDataSet. |
Method | _describe |
Undocumented |
Method | _exists |
Undocumented |
Method | _invalidate |
Undocumented |
Method | _join |
Undocumented |
Method | _list |
Undocumented |
Method | _load |
Undocumented |
Method | _partition |
Undocumented |
Method | _path |
Undocumented |
Method | _release |
Undocumented |
Method | _save |
Undocumented |
Instance Variable | _credentials |
Undocumented |
Instance Variable | _dataset |
Undocumented |
Instance Variable | _dataset |
Undocumented |
Instance Variable | _filename |
Undocumented |
Instance Variable | _filepath |
Undocumented |
Instance Variable | _fs |
Undocumented |
Instance Variable | _load |
Undocumented |
Instance Variable | _overwrite |
Undocumented |
Instance Variable | _partition |
Undocumented |
Instance Variable | _path |
Undocumented |
Instance Variable | _protocol |
Undocumented |
Instance Variable | _sep |
Undocumented |
Property | _filesystem |
Undocumented |
Property | _normalized |
Undocumented |
Inherited from AbstractDataSet
:
Class Method | from |
Create a data set instance using the configuration provided. |
Method | __str__ |
Undocumented |
Method | exists |
Checks whether a data set's output already exists by calling the provided _exists() method. |
Method | load |
Loads data by delegation to the provided load method. |
Method | release |
Release any cached data. |
Method | save |
Saves data by delegation to the provided save method. |
Method | _copy |
Undocumented |
Property | _logger |
Undocumented |
def __init__(self, path:
str
, dataset: Union[ str, Type[ AbstractDataSet], Dict[ str, Any]]
, filepath_arg: str
= 'filepath', filename_suffix: str
= '', credentials: Dict[ str, Any]
= None, load_args: Dict[ str, Any]
= None, fs_args: Dict[ str, Any]
= None, overwrite: bool
= False):
(source)
¶
overridden in
kedro.io.IncrementalDataSet
Creates a new instance of PartitionedDataSet.
Parameters | |
path:str | Path to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataSet. |
dataset:Union[ | Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataSet b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration. |
filepathstr | Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to "filepath". |
filenamestr | If specified, only partitions that end with this string will be processed. |
credentials:Dict[ | Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-credentials |
loadDict[ | Keyword arguments to be passed into find() method of the filesystem implementation. |
fsDict[ | Extra arguments to pass into underlying filesystem class constructor
(e.g. {"project": "my-project"} for GCSFileSystem) |
overwrite:bool | If True, any existing partitions will be removed. |
Raises | |
DataSetError | If versioning is enabled for the underlying dataset. |
@cachedmethod( cache=operator.attrgetter( '_partition_cache'))
def _list_partitions(self) ->
def _list_partitions(self) ->
List[ str]
:
(source)
¶
overridden in
kedro.io.IncrementalDataSet
Undocumented