PySpark: Interview Questions (Coding)— Part 1
Navigating through a PySpark interview can be a challenging endeavor, especially when faced with scenario-based questions that test not only one’s theoretical knowledge but also practical problem-solving skills.
In this article, I’ll share several questions I personally encountered during interviews and also important ones. I believe these questions will provide insight into the types of inquiries encountered, whether in online assessments or face-to-face interviews.
Q1. ClickStream
Given a clickstream of user activity data , find the relevant user session for each click event.
click_time | user_id
2018–01–01 11:00:00 | u1
2018–01–01 12:00:00 | u1
2018–01–01 13:00:00 | u1
2018–01–01 13:00:00 | u1
2018–01–01 14:00:00 | u1
2018–01–01 15:00:00 | u1
2018–01–01 11:00:00 | u2
2018–01–02 11:00:00 | u2session definition:
1. session expires after inactivity of 30mins, because of inactivity no clickstream will be generated
2. session remain active for total of 2 hours
Sol -
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, lag, when, lit, concat, sum, monotonically_increasing_id
from pyspark.sql.window import Window
# Create a SparkSession
spark = SparkSession.builder \
.appName("ClickStreamSession") \
.getOrCreate()
# Define the schema for the clickstream data
schema = "click_time STRING, user_id STRING"
# Sample clickstream data
data = [
("2018-01-01 11:00:00", "u1"),
("2018-01-01 12:00:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 13:00:00", "u1"),
("2018-01-01 14:00:00", "u1"),
("2018-01-01 15:00:00", "u1"),
("2018-01-01 11:00:00", "u2"),
("2018-01-02 11:00:00", "u2")
]
# Create a DataFrame from the given sample data
clickstream_df = spark.createDataFrame(data, schema=schema)
# Convert click_time to Unix timestamp for easier calculations
clickstream_df = clickstream_df.withColumn("click_timestamp", unix_timestamp("click_time"))
session_window = Window.partitionBy("user_id").orderBy("click_timestamp")
# Getting the previous row value using lag
clickstream_df = clickstream_df.withColumn("prev_click_timestamp", lag("click_timestamp", 1).over(session_window))
# Difference between click time and dividing that with 60
clickstream_df = clickstream_df.withColumn("timestamp_diff", (col("click_timestamp")-col("prev_click_timestamp"))/60)
# Updating null with 0
clickstream_df = clickstream_df.withColumn("timestamp_diff", when(col("timestamp_diff").isNull(), 0).otherwise(col("timestamp_diff")))
# Check for new session
clickstream_df = clickstream_df.withColumn("session_new", when(col("timestamp_diff") > 30, 1).otherwise(0))
# New session names
clickstream_df = clickstream_df.withColumn("session_new_name", concat(col("user_id"), lit("--S"), sum(col("session_new")).over(session_window)))
clickstream_df.show()
Q2. Max Salary
Ask is to find the job titles of the highest-paid employees. output should include the highest-paid title or multiple titles with the same salary.
Input —
worker:
|worker_id|first_name|last_name|salary|joining_date| department|
| 1| John| Doe| 10000| 2023–01–01|Engineering|
| 2| Jane| Smith| 12000| 2022–12–01| Marketing|
| 3| Alice| Johnson| 12000| 2022–11–01|Engineering|title:
|worker_ref_id|worker_title|affected_from|
| 1| Engineer| 2022–01–01|
| 2| Manager| 2022–01–01|
| 3| Engineer| 2022–01–01|Output —
|worker_id|first_name|last_name|best_paid_title|salary|
| 3| Alice| Johnson| Engineer| 12000|
| 2| Jane| Smith| Manager| 12000|
Sol -
from pyspark.sql import SparkSession, SQLContext, Window
from pyspark.sql.functions import rank
spark = SparkSession.builder \
.appName("HighestPaidJobTitles") \
.getOrCreate()
worker_data = [(1, 'John', 'Doe', 10000, '2023-01-01', 'Engineering'),
(2, 'Jane', 'Smith', 12000, '2022-12-01', 'Marketing'),
(3, 'Alice', 'Johnson', 12000, '2022-11-01', 'Engineering')]
columns = ['worker_id', 'first_name', 'last_name', 'salary', 'joining_date', 'department']
worker = spark.createDataFrame(worker_data, columns)
title_data = [(1, 'Engineer', '2022-01-01'),
(2, 'Manager', '2022-01-01'),
(3, 'Engineer', '2022-01-01')]
columns = ['worker_ref_id', 'worker_title', 'affected_from']
title = spark.createDataFrame(title_data, columns)
joined_df = worker.join(title, worker.worker_id == title.worker_ref_id)
ranked_df = joined_df.withColumn("salary_rank", f.rank().over(Window.orderBy(joined_df["salary"].desc())))
highest_paid_df = ranked_df.filter(ranked_df["salary_rank"] == 1)
result_df = highest_paid_df.select("worker_id", "first_name", "last_name", "worker_title", "salary").withColumnRenamed('worker_title', 'best_paid_title')
result_df.show()
spark.stop()
Q3. Highest and Lowest Salary
You have been asked to find the employees with the highest and lowest salary from the below sample data for worker and t.The output includes a column salary_type that categorizes the output by:
‘Highest Salary’ represents the highest salary
‘Lowest Salary’ represents the lowest salaryworker:
|worker_id|first_name|last_name|salary|joining_date| department|
| 1| John| Doe| 5000| 2023–01–01|Engineering|
| 2| Jane| Smith| 6000| 2023–01–15| Marketing|
| 3| Alice| Johnson| 4500| 2023–02–05|Engineering|title:
|worker_ref_id|worker_title|affected_from|
| 1| Engineer| 2022–01–01|
| 2| Manager| 2022–01–01|
| 3| Engineer| 2022–01–01|
Sol -
from pyspark.sql import SparkSession
from pyspark.sql.functions import max, min, when
spark = SparkSession.builder \
.appName("HighestLowestSalaryEmployees") \
.getOrCreate()
worker_data = [
(1, 'John', 'Doe', 5000, '2023-01-01', 'Engineering'),
(2, 'Jane', 'Smith', 6000, '2023-01-15', 'Marketing'),
(3, 'Alice', 'Johnson', 4500, '2023-02-05', 'Engineering')
]
title_data = [
(1, 'Engineer', '2022-01-01'),
(2, 'Manager', '2022-01-01'),
(3, 'Engineer', '2022-01-01')
]
worker_columns = ['worker_id', 'first_name', 'last_name', 'salary', 'joining_date', 'department']
title_columns = ['worker_ref_id', 'worker_title', 'affected_from']
worker_df = spark.createDataFrame(worker_data, worker_columns)
title_df = spark.createDataFrame(title_data, title_columns)
worker_df.show()
title_df.show()
joined_df = worker_df.join(title_df, worker_df.worker_id == title_df.worker_ref_id, "inner")
result_df = joined_df.groupBy("worker_id", "first_name", "last_name", "salary", "department") \
.agg(
max("salary").alias("max_salary"),
min("salary").alias("min_salary")
)
result_df = result_df.withColumn("salary_type",
when(result_df["salary"] == result_df["max_salary"], "Highest Salary")
.when(result_df["salary"] == result_df["min_salary"], "Lowest Salary")
.otherwise(None))
result_df.show()
spark.stop()
Q4. Pivot the Column into Rows
Given Input -
StudentID, StudentName , AScore, BScore,CScore
123, A, 30, 31, 32
124, B, 40, 41, 42Get the output in below format -
StudentID, StudentName , Subject , Score
123, A, AScore, 30
123, A, BScore, 31
123, A, CScore, 32
124, B, AScore, 40
124, B, BScore, 41
124, B, SScore, 42
Sol -
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("Transform Data") \
.getOrCreate()
data = [
(123, "A", 30, 31, 32),
(124, "B", 40, 41, 42),
(125, "B", 50, 51, 52)
]
df = spark.createDataFrame(data, ["StudentID", "StudentName", "AScore", "BScore", "CScore"])
pivot_df = df.selectExpr(
"StudentID",
"StudentName",
"stack(3, 'AScore', AScore, 'BScore', BScore, 'CScore', CScore) as (Subject, Score)"
)
pivot_df.show()
Note: "stack(3, 'AScore', AScore, 'BScore', BScore, 'CScore', CScore)"
: This part uses the stack
function to pivot the DataFrame. The first argument 3
indicates the number of key-value pairs to stack. The subsequent arguments are key-value pairs where the first argument is the key (in this case, column name) and the second argument is the value (the column value). So, for each row, three key-value pairs are created: ('AScore', AScore)
, ('BScore', BScore)
, and ('CScore', CScore)
. The stack
function stacks these key-value pairs into rows
Q5. Repeat IDs
Given Input —
id
1
2
3Output —
id
1
2
2
3
3
3
Sol -
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, explode, split
spark = SparkSession.builder \
.appName("Repeat ID") \
.getOrCreate()
df = spark.createDataFrame([(1,), (2,), (3,)], ["id"])
output_df = df.selectExpr("explode(sequence(1, id)) as id")
output_df.show()
Q6. Group Rows
Input —
col1|col2|col3
alpha| aa| 1
alpha| aa| 2
beta| bb| 3
beta| bb| 4
beta| bb| 5Output —
col1|col2|col3_list
alpha| aa| [1, 2]
beta| bb|[3, 4, 5]
Sol -
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("HighestLowestSalaryEmployees") \
.getOrCreate()
data = [("alpha", "aa", 1),
("alpha", "aa", 2),
("beta", "bb", 3),
("beta", "bb", 5),
("beta", "bb", 4)]
schema = ["col1", "col2", "col3"]
df = spark.createDataFrame(data, schema=schema)
df.show()
df_grouped = df.groupBy("col1", "col2").agg(collect_list("col3").alias("col3_list"))
df_grouped.show()
spark.stop()
Q7. Json Data
Read below json file —
[
{
“dept_id”: 102,
“e_id”: [
10201,
10202
]
},
{
“dept_id”: 101,
“e_id”: [
10101,
10102,
10103
]
}
]output —
dept_id | e_id
101 | 10101
101 | 10102
101 | 10103
102 | 10201
102 | 10202
Sol -
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("ReadJSON") \
.getOrCreate()
df = spark.read.option("multiline", "true").json('sample_data.json')
df_exploded = df.selectExpr("dept_id", "explode(e_id) as e_id")
# df_exploded = df.select("dept_id", explode('e_id').alias('e_id'))
df_exploded.show()
Note:
Need to use option("multiline", "true")
, Otherwise this will below exception —
AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column(named _corrupt_record by default).
This is because PySpark considers every record in a JSON file as a fully qualified record in a single line. PySpark JSON data source API provides the multiline
option to read records from multiple lines.
Q8. Employee Total Hours Inside
From the below data, find the total hours employee was inside office.
Input —
emp_id| punch_time|flag
11114|1900–01–01 08:30:00| I
11114|1900–01–01 10:30:00| O
11114|1900–01–01 11:30:00| I
11114|1900–01–01 15:30:00| O
11115|1900–01–01 09:30:00| I
11115|1900–01–01 17:30:00| O
Sol —
import datetime
from pyspark.sql.types import StructType, StructField, TimestampType, LongType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("TotalInTime") \
.getOrCreate()
_data = [
(11114, datetime.datetime.strptime('08:30:00.00', "%H:%M:%S.%f"), "I"),
(11114, datetime.datetime.strptime('10:30:00.00', "%H:%M:%S.%f"), 'O'),
(11114, datetime.datetime.strptime('11:30:00.00', "%H:%M:%S.%f"), 'I'),
(11114, datetime.datetime.strptime('15:30:00.00', "%H:%M:%S.%f"), 'O'),
(11115, datetime.datetime.strptime('09:30:00.00', "%H:%M:%S.%f"), 'I'),
(11115, datetime.datetime.strptime('17:30:00.00', "%H:%M:%S.%f"), 'O')
]
# Schema
_schema = StructType([
StructField('emp_id', LongType(), True),
StructField('punch_time', TimestampType(), True),
StructField('flag', StringType(), True)
])
df = spark.createDataFrame(data=_data, schema=_schema)
window_agg = Window.partitionBy('emp_id').orderBy(col('punch_time'))
df = df.withColumn('prev_time', lag(col('punch_time')).over(window_agg))
df = df.withColumn('time_diff', (col('punch_time').cast('long') - col('prev_time').cast('long'))/3600)
df = df.groupBy('emp_id').agg(sum(when(col('flag') == 'O', col('time_diff')).otherwise(0)).alias('total_time'))
print(df.show())
Q9. Employee with Manager
From the given data set, Fetch the manager and their employees
Input —
employee_id|first_name|manager_id
4529| Nancy| 4125
4238| John| 4329
4329| Martina| 4125
4009| Klaus| 4329
4125| Mafalda| NULL
4500| Jakub| 4529
4118| Moira| 4952
4012| Jon| 4952
4952| Sandra| 4529
4444| Seamus| 4329Output —
manager_id|manager_name|count
4125| Mafalda| 2
4952| Sandra| 2
4329| Martina| 3
4529| Nancy| 2
Sol —
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName("EmployeesWithManagers") \
.getOrCreate()
data = [('4529', 'Nancy', '4125'),
('4238','John', '4329'),
('4329', 'Martina', '4125'),
('4009', 'Klaus', '4329'),
('4125', 'Mafalda', 'NULL'),
('4500', 'Jakub', '4529'),
('4118', 'Moira', '4952'),
('4012', 'Jon', '4952'),
('4952', 'Sandra', '4529'),
('4444', 'Seamus', '4329')]
schema = ['employee_id', 'first_name', 'manager_id']
df = spark.createDataFrame(data=data, schema=schema)
df = spark.createDataFrame(data=data, schema=schema)
# Self-join the DataFrame to get manager names
result_df = df.alias("e").join(df.alias("m"), col("e.manager_id") == col("m.employee_id"), "inner") \
.select(col("e.employee_id"), col("e.first_name"),
col("e.manager_id"), col("m.first_name").alias("manager_name"))
result_df.groupBy("manager_id", "manager_name").count().show()
spark.stop()
Q10. Count the Frequency of each Word
Write a word count function using pyspark that covers following steps —
1. read the data from the input.txt file
2. lower the words
3. remove any punctuations
4. Remove the None, blank strings and digits
5. sort the word sin descending order
6. write it to a csv fileTest Input —
test_data = [
“Hello world! This is a test.”,
“Spark is awesome, isn’t it?”,
“Test test test.”
“Test 0 21.”
]Output —
word|count
test| 4
is| 2
a| 1
awesome| 1
spark| 1
this| 1
world| 1
hello| 1
isn| 1
it| 1
Sol —
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
def word_count(output_file_path):
# Initialize SparkSession
spark = SparkSession.builder \
.appName("WordCount") \
.getOrCreate()
# Test data
test_data = [
"Hello world! This is a test.",
"Spark is awesome, isn't it?",
"Test test test."
"Test 0 21."
]
# Create DataFrame from test data
input_df = spark.createDataFrame(test_data, StringType())
input_df = input_df.select(split(col("value"), " ").alias("line"))
input_df = input_df.select(explode(col("line")).alias("value"))
# lower the words
input_df = input_df.withColumn("value", lower(col("value")))
# Split text into words, remove punctuations
words_df = input_df.select(regexp_extract(col("value"), "[a-z]+", 0).alias("word"))
# Remove None or blank strings
words_df = words_df.filter(col("word").isNotNull() & (col("word") != "") & (~col("value").rlike("^\d+$")))
# Perform word count
word_count_df = words_df.groupBy("word").count()
# Sort the words in descending order
word_count_df = word_count_df.orderBy(col("count").desc())
word_count_df.show()
# Write the word count results to CSV file
word_count_df.coalesce(1).write.csv(output_file_path, header=True)
# Stop SparkSession
spark.stop()
# Usage example:
output_file_path = "output_word_count_result.csv"
word_count(output_file_path)
Winding Up
I hope these questions will give an idea about the types of questions asked and also confident to navigate PySpark interviews and demonstrate your expertise in handling big data processing tasks. Since all these questions involves different PySpark functions which are really important to know and understand their usage if you are asked to generate the required output. And Yes there multiple ways to answer them, I have only shared my approach.
I will share more questions in the subsequent parts of PySpark Interview Questions
articles. I hope this encouraged you to delve deeper into PySpark’s functionalities, and engage in hands-on practice.