PySpark : PySpark program to write DataFrame to Snowflake table.

PySpark @ Freshers.in

Overview of Snowflake and PySpark.

Snowflake is a cloud-based data warehousing platform that allows users to store and analyze large volumes of data using SQL. It provides high-performance, scalable, and secure storage and analytics solutions. Snowflake is highly compatible with various programming languages, including Python.

PySpark is the Python API for Apache Spark, a fast and general-purpose cluster computing system. PySpark provides a simple and efficient way to process large amounts of data in parallel.

To write DataFrame to a Snowflake table using PySpark, we need to perform the following steps:

  1. Configure Spark to use the Snowflake JDBC driver
  2. Create a Spark session and load data into a DataFrame
  3. Write the DataFrame to a Snowflake table using the Snowflake connector

Here’s an example program that demonstrates how to write DataFrame to Snowflake table using PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import col
import os

# Set Snowflake credentials as environment variables
os.environ['SNOWFLAKE_USER'] = 'username'
os.environ['SNOWFLAKE_PASSWORD'] = 'password'
os.environ['SNOWFLAKE_ACCOUNT'] = 'account_name'
os.environ['SNOWFLAKE_DATABASE'] = 'database_name'
os.environ['SNOWFLAKE_SCHEMA'] = 'schema_name'
os.environ['SNOWFLAKE_WAREHOUSE'] = 'warehouse_name'

# Create Spark session
spark = SparkSession.builder.appName('SnowflakeWrite').getOrCreate()

# Define schema for DataFrame
schema = StructType([
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True),
    StructField('city', StringType(), True)
])

# Load data into DataFrame
data = [
    (1, 'John', 25, 'New York'),
    (2, 'Jane', 30, 'San Francisco'),
    (3, 'Bob', 40, 'Chicago')
]

df = spark.createDataFrame(data, schema)

# Write DataFrame to Snowflake table
df.write \
    .format('snowflake') \
    .option('dbtable', 'table_name') \
    .option('user', os.environ['SNOWFLAKE_USER']) \
    .option('password', os.environ['SNOWFLAKE_PASSWORD']) \
    .option('account', os.environ['SNOWFLAKE_ACCOUNT']) \
    .option('database', os.environ['SNOWFLAKE_DATABASE']) \
    .option('schema', os.environ['SNOWFLAKE_SCHEMA']) \
    .option('warehouse', os.environ['SNOWFLAKE_WAREHOUSE']) \
    .mode('overwrite') \
    .save()

# Read the data from the Snowflake table
df_from_snowflake = spark.read \
    .format('snowflake') \
    .option('dbtable', 'table_name') \
    .option('user', os.environ['SNOWFLAKE_USER']) \
    .option('password', os.environ['SNOWFLAKE_PASSWORD']) \
    .option('account', os.environ['SNOWFLAKE_ACCOUNT']) \
    .option('database', os.environ['SNOWFLAKE_DATABASE']) \
    .option('schema', os.environ['SNOWFLAKE_SCHEMA']) \
    .option('warehouse', os.environ['SNOWFLAKE_WAREHOUSE']) \
    .load()

# Show the data from the Snowflake table
df_from_snowflake.show()

In this example program, we set the Snowflake credentials as environment variables, create a Spark session, define a schema for the DataFrame, and load data into the DataFrame. We then write the DataFrame to a Snowflake table using the Snowflake connector, specifying the table name and Snowflake credentials. We also read the data from the Snowflake table and show it in the console. The .option() method is used to specify various connection and configuration options for the Snowflake connector, such as the Snowflake account, database, schema, warehouse, and authentication credentials. Finally, we use the .show() method to display the data from the Snowflake table.

Note that the mode parameter is set to ‘overwrite’ in the df.write statement. This means that if the table already exists, it will be truncated and the new data will overwrite the existing data. If you want to append data to an existing table, you can set mode

Author: user

Leave a Reply