Explore topic-wise InterviewSolutions in .

This section includes InterviewSolutions, each offering curated multiple-choice questions to sharpen your knowledge and support exam preparation. Choose a topic below to get started.

1.

What would happen if we lose RDD partitions due to the failure of the worker node?

Answer»

If any RDD partition is lost, then that partition can be recomputed using operations lineage from the original fault-tolerant dataset.

Conclusion

PySpark has GAINED immense popularity among the big data and machine learning enthusiasts as well as in the organizations like NETFLIX as it exposes Spark functionalities in python that helps developers collaborate with powerful libraries that aids in big data PROCESSING. In this article, we have seen the most commonly asked INTERVIEW questions in PySpark that would help developers crack interviews.

References

2.

What do you understand by Pyspark Streaming? How do you stream data using TCP/IP Protocol?

Answer»

PySpark STREAMING is scalable, fault-tolerant, HIGH throughput based processing streaming system that supports streaming as well as batch loads for supporting real-time data from data sources like TCP Socket, S3, Kafka, Twitter, file system folders etc. The processed data can be sent to live dashboards, Kafka, databases, HDFS etc.

To perform Streaming from the TCP socket, we can use the readStream.format("socket") method of Spark session object for READING data from TCP socket and providing the streaming source host and port as options as shown in the code below:

from pyspark import SparkContextfrom pyspark.streaming import StreamingContextfrom pyspark.sql import SQLContextfrom pyspark.sql.functions import descsc = SparkContext()ssc = StreamingContext(sc, 10)sqlContext = SQLContext(sc)socket_stream = ssc.socketTextStream("127.0.0.1", 5555)lines = socket_stream.window(20)df.printSchema()

Spark loads the data from the socket and represents it in the value column of the DataFrame object. The df.printSchema() prints

root|-- value: string (nullable = true)

Post data processing, the DataFrame can be streamed to the console or any other destinations based on the REQUIREMENTS like Kafka, dashboards, database etc.

3.

How can you inner join two DataFrames?

Answer»

We can make use of the join() method present in PySpark SQL. The syntax for the function is:

join(self, other, on=None, how=None)

where,
other - Right side of the join
on - column name string used for joining
how - type of join, by default it is INNER. The values can be inner, left, right, cross, full, outer, left_outer, right_outer, left_anti, left_semi.

The join expression can be appended with where() and filter() methods for filtering ROWS. We can have multiple join too by means of the chaining join() method.

Consider we have two dataframes - employee and department as shown below:

-- Employee DataFrame --+------+--------+-----------+|emp_id|emp_name|empdept_id |+------+--------+-----------+| 1| Harry| 5|| 2| Ron | 5|| 3| Neville| 10|| 4| Malfoy| 20|+------+--------+-----------+-- Department DataFrame --+-------+--------------------------+|dept_id| dept_name |+-------+--------------------------+| 5 | Information TECHNOLOGY | | 10| Engineering || 20| Marketting | +-------+--------------------------+

We can inner join the Employee DataFrame with Department DataFrame to get the department information along with employee information as:

emp_dept_df = empDF.join(deptDF,empDF.empdept_id == deptDF.dept_id,"inner").show(truncate=False)

The result of this becomes:

+------+--------+-----------+-------+--------------------------+|emp_id|emp_name|empdept_id |dept_id| dept_name |+------+--------+-----------+-------+--------------------------+| 1| Harry| 5| 5 | Information Technology || 2| Ron | 5| 5 | Information Technology || 3| Neville| 10| 10 | Engineering || 4| Malfoy| 20| 20 | Marketting | +------+--------+-----------+-------+--------------------------+

We can also perform joins by chaining join() method by following the syntax:

DF1.join(df2,["column_name"]).join(df3,df1["column_name"] == df3["column_name"]).show()

Consider we have a third dataframe called Address DataFrame having columns emp_id, city and state where emp_id acts as the foreign key equivalent of SQL to the Employee DataFrame as shown below:

-- Address DataFrame --+------+--------------+------+|emp_id| city |state |+------+--------------+------+|1 | Bangalore | KA ||2 | Pune | MH ||3 | MUMBAI | MH ||4 | Chennai | TN |+------+--------------+------+

If we want to get address details of the address along with the Employee and the Department Dataframe, then we can run,

resultDf = empDF.join(addressDF,["emp_id"]) .join(deptDF,empDF["empdept_id"] == deptDF["dept_id"]) .show()

The resultDf would be:

+------+--------+-----------+--------------+------+-------+--------------------------+|emp_id|emp_name|empdept_id | city |state |dept_id| dept_name |+------+--------+-----------+--------------+------+-------+--------------------------+| 1| Harry| 5| Bangalore | KA | 5 | Information Technology || 2| Ron | 5| Pune | MH | 5 | Information Technology || 3| Neville| 10| Mumbai | MH | 10 | Engineering || 4| Malfoy| 20| Chennai | TN | 20 | Marketting |+------+--------+-----------+--------------+------+-------+--------------------------+
4.

What is PySpark SQL?

Answer»

PySpark SQL is the most popular PySpark MODULE that is used to process structured columnar data. Once a DataFrame is CREATED, we can interact with data using the SQL syntax. Spark SQL is used for bringing native raw SQL queries on Spark by using select, where, group by, join, union etc. For using PySpark SQL, the first step is to create a temporary table on DataFrame by using createOrReplaceTempView() function. Post creation, the table is accessible throughout SparkSession by using sql() method. When the SparkSession GETS terminated, the temporary table will be dropped.
For example, consider we have the following DataFrame assigned to a variable df:

+-----------+----------+----------+| Name | Age | Gender |+-----------+----------+----------+| Harry | 20 | M || Ron | 20 | M || Hermoine | 20 | F |+-----------+----------+----------+

In the below piece of code, we will be creating a temporary table of the DataFrame that gets accessible in the SparkSession using the sql() method. The SQL queries can be run within the method.

df.createOrReplaceTempView("STUDENTS")df_new = spark.sql("SELECT * from STUDENTS")df_new.printSchema()

The schema will be displayed as shown below:

>> df.printSchema()root|-- Name: string (nullable = true)|-- Age: integer (nullable = true)|-- Gender: string (nullable = true)

For the above example, let’s try running group by on the Gender column:

groupByGender = spark.sql("SELECT Gender, count(*) as Gender_Count from STUDENTS group by Gender")groupByGender.show()

The above statements RESULTS in:

+------+------------+|Gender|Gender_Count|+------+------------+| F| 1 || M| 2 |+------+------------+
5.

What do you understand by Pyspark’s startsWith() and endsWith() methods?

Answer»

These methods belong to the Column class and are used for searching DataFrame ROWS by CHECKING if the column value starts with some value or ends with some value. They are used for filtering data in applications.

  • startsWith() – returns boolean Boolean value. It is true when the value of the column starts with the specified string and False when the match is not satisfied in that column value.
  • endsWith() – returns boolean Boolean value. It is true when the value of the column ends with the specified string and False when the match is not satisfied in that column value.

Both the methods are case-sensitive.

Consider an example of the startsWith() method here. We have created a DataFrame with 3 rows:

data = [('Harry', 20), ('Ron', 20), ('Hermoine', 20)]columns = ["Name","Age"]DF = spark.createDataFrame(data=data, schema = columns)

If we have the below code that checks for returning the rows where all the names in the Name column start with “H”,

IMPORT org.apache.spark.sql.functions.coldf.filter(col("Name").startsWith("H")).show()

The output of the code would be:

+-----------+----------+| Name | Age |+-----------+----------+| Harry | 20 || Hermoine | 20 |+-----------+----------+

Notice how the record with the Name “Ron” is filtered out because it does not start with “H”.

6.

Is it possible to create PySpark DataFrame from external data sources?

Answer»

Yes, it is! Realtime applications make use of external file systems like LOCAL, HDFS, HBase, MySQL table, S3 Azure etc. Following example shows how we can create DATAFRAME by reading data from a csv file present in the local system:

DF = spark.read.csv("/path/to/file.csv")

PySpark supports csv, TEXT, AVRO, parquet, tsv and many other file extensions.

7.

How can we create DataFrames in PySpark?

Answer»

We can do it by MAKING USE of the createDataFrame() method of the SparkSession.

data = [('Harry', 20), ('Ron', 20), ('Hermoine', 20)]COLUMNS = ["Name","Age"]df = spark.createDataFrame(data=data, schema = columns)

This creates the dataframe as shown below:

+-----------+----------+| Name | Age |+-----------+----------+| Harry | 20 || Ron | 20 || Hermoine | 20 |+-----------+----------+

We can GET the schema of the dataframe by using df.printSchema()

>> df.printSchema()root|-- Name: string (nullable = true)|-- Age: integer (nullable = true)
8.

What are the different approaches for creating RDD in PySpark?

Answer»

The FOLLOWING image represents how we can visualize RDD creation in PySpark:

In the image, we see that the data we have is the list form and post converting to RDDs, we have it stored in different partitions.
We have the following approaches for CREATING PySpark RDD:

  • USING sparkContext.parallelize(): The parallelize() method of the SparkContext can be used for creating RDDs. This method loads EXISTING collection from the driver and parallelizes it. This is a basic approach to create RDD and is used when we have data already present in the memory. This also requires the presence of all data on the Driver before creating RDD. Code to create RDD using the parallelize method for the python list shown in the image above:
list = [1,2,3,4,5,6,7,8,9,10,11,12]rdd=spark.sparkContext.parallelize(list)
  • Using sparkContext.textFile(): Using this method, we can read .txt file and convert them into RDD. Syntax:
rdd_txt = spark.sparkContext.textFile("/path/to/textFile.txt")
  • Using sparkContext.wholeTextFiles(): This function returns PairRDD (RDD CONTAINING key-value pairs) with file path being the key and the file content is the value.
#Reads entire file into a RDD as single record.rdd_whole_text = spark.sparkContext.wholeTextFiles("/path/to/textFile.txt")

We can also read csv, json, parquet and various other formats and create the RDDs.

  • Empty RDD with no partition using sparkContext.emptyRDD: RDD with no data is called empty RDD. We can create such RDDs having no partitions by using emptyRDD() method as shown in the code piece below:
empty_rdd = spark.sparkContext.emptyRDD # to create empty rdd of string typeempty_rdd_string = spark.sparkContext.emptyRDD[String]
  • Empty RDD with partitions using sparkContext.parallelize: When we do not require data but we require partition, then we create empty RDD by using the parallelize method as shown below:
#Create empty RDD with 20 partitionsempty_partitioned_rdd = spark.sparkContext.parallelize([],20)
9.

How to create SparkSession?

Answer»

To create SparkSession, we use the builder pattern. The SparkSession class from the pyspark.sql library has the getOrCreate() method which creates a new SparkSession if there is none or else it returns the existing SparkSession object. The following code is an example for creating SparkSession:

import pysparkfrom pyspark.sql import SparkSessionspark = SparkSession.builder.master("local[1]") .appName('InterviewBitSparkSession') .getOrCreate()

Here,

  • master() – This is used for setting up the mode in which the application has to run - cluster mode (use the master NAME) or standalone mode. For Standalone mode, we use the local[x] value to the FUNCTION, where x represents partition count to be created in RDD, DataFrame and DATASET. The value of x is ideally the number of CPU cores available.
  • appName() - Used for setting the application name
  • getOrCreate() – For returning SparkSession object. This creates a new object if it does not exist. If an object is there, it simply returns that.

If we want to create a new SparkSession object every TIME, we can use the newSession method as SHOWN below:

import pysparkfrom pyspark.sql import SparkSessionspark_session = SparkSession.newSession
10.

What are the profilers in PySpark?

Answer»

Custom PROFILERS are supported in PYSPARK. These are useful for building predictive models. Profilers are useful for data REVIEW to ensure that it is valid and can be used for consumption. When we require a custom PROFILER, it has to define some of the following methods:

  • profile: This PRODUCES a system profile of some sort.
  • stats: This returns collected stats of profiling.
  • dump: This dumps the profiles to a specified path.
  • add: This helps to add profile to existing accumulated profile. The profile class has to be selected at the time of SparkContext creation.
  • dump(id, path): This dumps a specific RDD id to the path given.
11.

How will you create PySpark UDF?

Answer»

Consider an example where we want to capitalize the first letter of every word in a string. This feature is not supported in PySpark. We can however achieve this by creating a UDF capitalizeWord(str) and using it on the DataFrames. The following STEPS demonstrate this:

  • Create Python function capitalizeWord that takes a string as input and capitalizes the first CHARACTER of every word.
def capitalizeWord(str): result="" words = str.split(" ") for word in words: result= result + word[0:1].upper() + word[1:len(x)] + " " return result
  • Register the function as a PySpark UDF by using the udf() method of org.apache.spark.sql.functions.udf package which needs to be imported. This method returns the object of class org.apache.spark.sql.expressions.UserDefinedFunction.
""" Converting function to UDF """capitalizeWordUDF = udf(lambda z: capitalizeWord(z),StringType())
  • USE UDF with DataFrame: The UDF can be applied on a Python DataFrame as that acts as the built-in function of DataFrame.
    Consider we have a DataFrame of stored in variable df as below:
+----------+-----------------+|ID_COLUMN |NAME_COLUMN |+----------+-----------------+|1 |harry potter ||2 |ronald weasley ||3 |hermoine granger |+----------+-----------------+

To capitalize every first character of the word, we can use:

df.select(col("ID_COLUMN"), convertUDF(col("NAME_COLUMN")) .alias("NAME_COLUMN") ) .show(truncate=False)

The output of the above code would be:

+----------+-----------------+|ID_COLUMN |NAME_COLUMN |+----------+-----------------+|1 |Harry Potter ||2 |Ronald Weasley ||3 |Hermoine Granger |+----------+-----------------+

UDFs have to be designed in a way that the algorithms are efficient and take less time and space complexity. If care is not taken, the performance of the DataFrame OPERATIONS would be impacted.

12.

Why is PySpark SparkConf used?

Answer»

PySpark SparkConf is used for setting the configurations and parameters required to RUN applications on a cluster or local system. The following class can be executed to run the SparkConf:

class pyspark.Sparkconf(localdefaults = True,_jvm = None,_jconf = None)

where:

  • loadDefaults - is of type boolean and indicates whether we require loading values from Java System PROPERTIES. It is True by default.
  • _jvm - This belongs to the class py4j.java_gateway.JVMView and is an internal parameter that is used for passing the handle to JVM. This need not be set by the users.
  • _jconf - This belongs to the class py4j.java_gateway.JavaObject. This parameter is an OPTION and can be used for passing existing SparkConf handles for USING the parameters.
13.

What is the common workflow of a spark program?

Answer»

The most common workflow followed by the spark program is:

  • The first STEP is to create input RDDs depending on the external data. Data can be obtained from different data sources.
  • Post RDD creation, the RDD transformation operations like FILTER() or map() are run for creating new RDDs depending on the BUSINESS logic.
  • If any intermediate RDDs are required to be REUSED for later purposes, we can persist those RDDs.
  • Lastly, if any action operations like first(), count() etc are present then spark launches it to initiate parallel computation.
14.

What PySpark DAGScheduler?

Answer»

DAG stands for Direct ACYCLIC Graph. DAGScheduler constitutes the scheduling layer of Spark which implements scheduling of tasks in a stage-oriented manner using jobs and stages. The logical execution plan (Dependencies lineage of transformation actions upon RDDs) is transformed into a physical execution plan CONSISTING of stages. It computes a DAG of stages needed for each job and keeps track of what stages are RDDs are materialized and finds a minimal schedule for running the jobs. These stages are then submitted to TaskScheduler for running the stages. This is represented in the image flow below:

DAGScheduler performs the following three things in Spark:

  • Compute DAG execution for the job.
  • Determine preferred locations for running each task
  • Failure Handling due to OUTPUT files lost during shuffling.

PySpark’s DAGScheduler follows event-queue architecture. Here a thread POSTS events of type DAGSchedulerEvent such as new stage or job. The DAGScheduler then reads the stages and sequentially EXECUTES them in topological order.

15.

What is PySpark Architecture?

Answer»

PySpark SIMILAR to Apache Spark works in master-slave architecture PATTERN. Here, the master node is called the Driver and the slave nodes are called the workers. When a Spark application is run, the Spark Driver creates SparkContext which acts as an entry point to the spark application. All the OPERATIONS are executed on the worker nodes. The RESOURCES required for executing the operations on the worker nodes are managed by the Cluster Managers. The following diagram illustrates the architecture described: