class documentation
class SparkJDBCDataSet(AbstractDataSet[
SparkJDBCDataSet loads data from a database table accessible via JDBC URL url and connection properties and saves the content of a PySpark DataFrame to an external database table via JDBC. It uses pyspark.sql.DataFrameReader and pyspark.sql.DataFrameWriter internally, so it supports all allowed PySpark options on jdbc.
Example usage for the YAML API:
weather:
type: spark.SparkJDBCDataSet
table: weather_table
url: jdbc:postgresql://localhost/test
credentials: db_credentials
load_args:
properties:
driver: org.postgresql.Driver
save_args:
properties:
driver: org.postgresql.Driver
Example usage for the Python API:
>>> import pandas as pd >>> >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.getOrCreate() >>> data = spark.createDataFrame(pd.DataFrame({'col1': [1, 2], >>> 'col2': [4, 5], >>> 'col3': [5, 6]})) >>> url = 'jdbc:postgresql://localhost/test' >>> table = 'table_a' >>> connection_properties = {'driver': 'org.postgresql.Driver'} >>> data_set = SparkJDBCDataSet( >>> url=url, table=table, credentials={'user': 'scott', >>> 'password': 'tiger'}, >>> load_args={'properties': connection_properties}, >>> save_args={'properties': connection_properties}) >>> >>> data_set.save(data) >>> reloaded = data_set.load() >>> >>> assert data.toPandas().equals(reloaded.toPandas())
Method | __init__ |
Creates a new SparkJDBCDataSet. |
Constant | DEFAULT |
Undocumented |
Constant | DEFAULT |
Undocumented |
Static Method | _get |
Undocumented |
Method | _describe |
Undocumented |
Method | _load |
Undocumented |
Method | _save |
Undocumented |
Instance Variable | _load |
Undocumented |
Instance Variable | _save |
Undocumented |
Instance Variable | _table |
Undocumented |
Instance Variable | _url |
Undocumented |
Inherited from AbstractDataSet
:
Class Method | from |
Create a data set instance using the configuration provided. |
Method | __str__ |
Undocumented |
Method | exists |
Checks whether a data set's output already exists by calling the provided _exists() method. |
Method | load |
Loads data by delegation to the provided load method. |
Method | release |
Release any cached data. |
Method | save |
Saves data by delegation to the provided save method. |
Method | _copy |
Undocumented |
Method | _exists |
Undocumented |
Method | _release |
Undocumented |
Property | _logger |
Undocumented |
def __init__(self, url:
str
, table: str
, credentials: Dict[ str, Any]
= None, load_args: Dict[ str, Any]
= None, save_args: Dict[ str, Any]
= None):
(source)
¶
Creates a new SparkJDBCDataSet.
Parameters | |
url:str | A JDBC URL of the form jdbc:subprotocol:subname. |
table:str | The name of the table to load or save data to. |
credentials:Dict[ | A dictionary of JDBC database connection arguments. Normally at least properties user and password with their corresponding values. It updates properties parameter in load_args and save_args in case it is provided. |
loadDict[ | Provided to underlying PySpark jdbc function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html |
saveDict[ | Provided to underlying PySpark jdbc function along with the JDBC URL and the name of the table. To find all supported arguments, see here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameWriter.jdbc.html |
Raises | |
DataSetError | When either url or table is empty or when a property is provided with a None value. |