Questions and Answers
Question rOsq6LS2MwxoYcQmIw38
Question
A data team’s Structured Streaming job is configured to calculate running aggregates for item sales to update a downstream marketing dashboard. The marketing team has introduced a new promotion, and they would like to add a new field to track the number of times this promotion code is used for each item. A junior data engineer suggests updating the existing query as follows. Note that proposed changes are in bold.
Original query:
//IMG//
Proposed query:
//IMG//
Proposed query:
.start(“/item_agg”)
Which step must also be completed to put the proposed query into production?
Choices
- A: Specify a new checkpointLocation
- B: Increase the shuffle partitions to account for additional aggregates
- C: Run REFRESH TABLE delta.’/item_agg’
- D: Register the data in the “/item_agg” directory to the Hive metastore
- E: Remove .option(‘mergeSchema’, ‘true’) from the streaming write
answer?
Answer: A Answer_ET: A Community answer A (100%) Discussion
Comment 1102541 by f728f7f
- Upvotes: 24
This question is broken. Proposed query cannot be identified.
Comment 1330263 by AlejandroU
- Upvotes: 5
Selected Answer: A Below is the proposed query:
df.groupBy(“item”) .agg(count(“item”).alias(“total_count”), mean(“sale_price”).alias(“avg_price”), count(“promo_code = ‘NEW MEMBER’”) .alias(“new member_promo”)) writeStream .outputMode(“complete”) .option(‘mergeSchema’, ‘true’) .option(“checkpointLocation”, “/item_agg/ checkpoint”) .start(“/item_agg”)
Answer A. When updating the schema of a streaming job by adding new fields (like the new_member_promo field), it’s important to use a new checkpoint location. This is because the existing checkpoint location is tied to the old schema, and adding a new field could lead to schema mismatch issues.
Comment 1326473 by kino_1994
- Upvotes: 2
Selected Answer: A Since the new field is a count (an aggregation), it is non-nullable, making the change incompatible with the existing schema. This requires a new checkpointLocation to avoid schema mismatch issues. Additionally, the “mergeSchema=true” option must remain enabled to allow Spark to handle the schema evolution properly.
However, if the field were nullable and not an aggregation, it would be a backward-compatible change, allowing the checkpoint to remain unchanged, as happens with schema evolution in Kafka. In this case, the correct answer is A.
Comment 1326061 by Sriramiyer92
- Upvotes: 1
Selected Answer: A The given answer is correct. In case of addition of new cols (or changes) the checkpoint location also needs to change.
Question UA1iAPk9u9ejipxfyymG
Question
A Structured Streaming job deployed to production has been resulting in higher than expected cloud storage costs. At present, during normal execution, each microbatch of data is processed in less than 3s; at least 12 times per minute, a microbatch is processed that contains 0 records. The streaming write was configured using the default trigger settings. The production job is currently scheduled alongside many other Databricks jobs in a workspace with instance pools provisioned to reduce start-up time for jobs with batch execution.
Holding all other variables constant and assuming records need to be processed in less than 10 minutes, which adjustment will meet the requirement?
Choices
- A: Set the trigger interval to 3 seconds; the default trigger interval is consuming too many records per batch, resulting in spill to disk that can increase volume costs.
- B: Increase the number of shuffle partitions to maximize parallelism, since the trigger interval cannot be modified without modifying the checkpoint directory.
- C: Set the trigger interval to 10 minutes; each batch calls APIs in the source storage account, so decreasing trigger frequency to maximum allowable threshold should minimize this cost.
- D: Set the trigger interval to 500 milliseconds; setting a small but non-zero trigger interval ensures that the source is not queried too frequently.
- E: Use the trigger once option and configure a Databricks job to execute the query every 10 minutes; this approach minimizes costs for both compute and storage.
answer?
Answer: C Answer_ET: C Community answer C (54%) E (46%) Discussion
Comment 1330365 by AlejandroU
- Upvotes: 1
Selected Answer: C Answer C. Setting the trigger interval to 10 minutes (option C) directly aligns with the requirement to process records within a 10-minute window. It achieves the same reduction in processing frequency as option E but without the added complexity of job scheduling or reliance on trigger once. Using the trigger once option requires external orchestration (e.g., a scheduled Databricks job) to execute every 10 minutes. This adds operational overhead and potential delays due to job scheduling or startup times, especially in a shared workspace using instance pools.
Comment 1328719 by UrcoIbz
- Upvotes: 2
Selected Answer: C In my opinion, both C and E met the requirements. But the sentence says ‘Holding all other variables constant’. This indicates me that E cannot be the solution, as new variables are introduced.
Comment 1321764 by benni_ale
- Upvotes: 1
Selected Answer: E The fact that the question mentions instance pools provisioned make me guess that we should go for trigger once option otherwise instance pools are useless.
Comment 1290660 by pk07
- Upvotes: 2
Selected Answer: C E WRONG. Using trigger once would stop the stream after one execution, not meeting the requirement of continuous processing.
Comment 1267456 by practicioner
- Upvotes: 1
Selected Answer: E E is correct for two reasons:
- we have been using the connection pool that allows us to start our job instantly
- the questions are about reducing costs. Triggering one per 10 minutes allows not to use running VM (as in option C) and to keep the same SLA (due to 1) ) with lower cost for compute as well as for storage (fewer API calls which are not free )
Comment 1191297 by Er5
- Upvotes: 3
required “to be processed in less than 10 minutes”. C. “set the trigger interval to 10 minutes” means Process time + interval > 10 minutes E. “trigger once”, “execute the query every 10 minutes”
Comment 1169449 by vikram12apr
- Upvotes: 2
Selected Answer: E default trigger time is 0.5 seconds Hence in a minute there are 120 triggers happens Each trigger consume 3 seconds to complete now 120*3 = 360 seconds = 6 minutes Hence the job is completing in 6 minutes Now there is buffer of 4 minutes which can be utilized in compute spin up but as we are using the spot instances which will further decrease the start up time I think E is correct option to decrease the cost.
Comment 1167765 by hidelux
- Upvotes: 3
Selected Answer: E The question indicates that they are using instance pools for fast startup time. option C would block a VM permanently which is not intended. E will grab a VM, run the job, and return it to the pool to be available for other jobs mentioned in the question.
Comment 1132843 by spaceexplorer
- Upvotes: 3
Selected Answer: C C is more effective than E as E will incur startup time for spinning new job cluster
Comment 1129924 by ranith
- Upvotes: 1
The default trigger interval is 500ms, but the question says it processes batches with 0 records and the avg time to process is 3s. If the requirement is to process under 10 minutes the best option here is to trigger every 3s.
Comment 1111451 by divingbell17
- Upvotes: 2
Selected Answer: C Both C and E meet the requirement to reduce cloud storage cost. E further reduces compute cost however reducing compute cost is not a requirement in the question.
Comment 1099611 by alexvno
- Upvotes: 3
Selected Answer: C For production → records need to be processed in less than 10 minutes. So we need to schedule each 10 minutes
Comment 1076601 by aragorn_brego
- Upvotes: 4
Selected Answer: E Given that there are frequent microbatches with 0 records being processed, it indicates that the job is polling the source too often. Using the “trigger once” option would allow each microbatch to process all available data and then stop. By scheduling the job to run every 10 minutes, you ensure that the system is not constantly checking for new data when there is none, thus reducing the number of read operations from the source storage and potentially reducing costs associated with those reads.
Question oqBpL4l1E6rNZzJqaNKl
Question
Which statement describes the correct use of pyspark.sql.functions.broadcast?
Choices
- A: It marks a column as having low enough cardinality to properly map distinct values to available partitions, allowing a broadcast join.
- B: It marks a column as small enough to store in memory on all executors, allowing a broadcast join.
- C: It caches a copy of the indicated table on attached storage volumes for all active clusters within a Databricks workspace.
- D: It marks a DataFrame as small enough to store in memory on all executors, allowing a broadcast join.
- E: It caches a copy of the indicated table on all nodes in the cluster for use in all future queries during the cluster lifetime.
answer?
Answer: D Answer_ET: D Community answer D (100%) Discussion
Comment 1220623 by Freyr
- Upvotes: 3
Selected Answer: D Correct Answer: D. It marks a DataFrame as small enough to store in memory on all executors, allowing a broadcast join.
Comment 1076602 by aragorn_brego
- Upvotes: 3
Selected Answer: D The broadcast function in PySpark is used in the context of joins. When you mark a DataFrame with broadcast, Spark tries to send this DataFrame to all worker nodes so that it can be joined with another DataFrame without shuffling the larger DataFrame across the nodes. This is particularly beneficial when the DataFrame is small enough to fit into the memory of each node. It helps to optimize the join process by reducing the amount of data that needs to be shuffled across the cluster, which can be a very expensive operation in terms of computation and time.
Comment 1066256 by Dileepvikram
- Upvotes: 1
Answer is D
Comment 1063420 by PearApple
- Upvotes: 1
The answer is D
Comment 1057120 by hm358
- Upvotes: 2
Selected Answer: D https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.broadcast.html
Comment 1053469 by sturcu
- Upvotes: 3
Selected Answer: D Marks a DataFrame as small enough for use in broadcast joins.
Question VmfletzXAl3bWxsHTY1D
Question
A data engineer is configuring a pipeline that will potentially see late-arriving, duplicate records.
In addition to de-duplicating records within the batch, which of the following approaches allows the data engineer to deduplicate data against previously processed records as it is inserted into a Delta table?
Choices
- A: Set the configuration delta.deduplicate = true.
- B: VACUUM the Delta table after each batch completes.
- C: Perform an insert-only merge with a matching condition on a unique key.
- D: Perform a full outer join on a unique key and overwrite existing data.
- E: Rely on Delta Lake schema enforcement to prevent duplicate records.
answer?
Answer: C Answer_ET: C Community answer C (93%) 7% Discussion
Comment 1053472 by sturcu
- Upvotes: 5
Selected Answer: C Merge, when not match insert
Comment 1338910 by lene
- Upvotes: 1
Selected Answer: C this question was in the Databricks DE Professional exam guide
Comment 1320114 by hebied
- Upvotes: 1
Selected Answer: D Option C is tricky since it should be merge on on Not match condition rather than matching .. Since Option D is more suitable
Comment 1076605 by aragorn_brego
- Upvotes: 4
Selected Answer: C To handle deduplication against previously processed records in a Delta table, the MERGE INTO command can be used to perform an upsert operation. This means that if the incoming data has a record that matches an existing record based on a unique key, the MERGE INTO operation can update the existing record (if needed) or simply ignore the duplicate. If there is no match (i.e., the record is new), then the record will be inserted
Comment 1071855 by 60ties
- Upvotes: 2
Selected Answer: C answer is C
Comment 1066258 by Dileepvikram
- Upvotes: 2
Answer is C
Comment 1057121 by hm358
- Upvotes: 2
Selected Answer: C merge will be more efficient
Comment 1050930 by Crocjun
- Upvotes: 1
C Reference: file:///C:/Users/yuen1/Downloads/databricks-certified-data-engineer-professional-exam-guide.pdf
Question dPEvuEwrLsNOIa2csn8V
Question
A data pipeline uses Structured Streaming to ingest data from Apache Kafka to Delta Lake. Data is being stored in a bronze table, and includes the Kafka-generated timestamp, key, and value. Three months after the pipeline is deployed, the data engineering team has noticed some latency issues during certain times of the day.
A senior data engineer updates the Delta Table’s schema and ingestion logic to include the current timestamp (as recorded by Apache Spark) as well as the Kafka topic and partition. The team plans to use these additional metadata fields to diagnose the transient processing delays.
Which limitation will the team face while diagnosing this problem?
Choices
- A: New fields will not be computed for historic records.
- B: Spark cannot capture the topic and partition fields from a Kafka source.
- C: New fields cannot be added to a production Delta table.
- D: Updating the table schema will invalidate the Delta transaction log metadata.
- E: Updating the table schema requires a default value provided for each field added.
answer?
Answer: A Answer_ET: A Community answer A (100%) Discussion
Comment 1108863 by dmov
- Upvotes: 8
Selected Answer: A Looks like A to me. Does anyone think otherwise?
Comment 1141693 by vctrhugo
- Upvotes: 6
Selected Answer: A When the schema of a Delta table is updated to include new fields, these fields will only be populated for new records ingested after the schema update. The new fields will not be retroactively computed for historic records already stored in the Delta table. Therefore, the additional metadata fields (current timestamp, Kafka topic, and partition) will not exist in the historic data, limiting the scope of the diagnosis to new data ingested after the schema update.
Comment 1340301 by RandomForest
- Upvotes: 1
Selected Answer: A The correct answer is A.