GDPR已使删除成为每个人数据管理工具箱中的必备工具。通过允许用户指定不同的记录有效载荷实现,Apache Hudi支持对存储在Hudi数据集中的数据执行两种类型的删除。
Soft Deletes : 使用软删除时,用户希望保留键,但仅使所有其他字段的值都为空。通过确保适当的字段在数据集架构中可以为空,并在将这些字段设置为null之后简单地向上更新数据集,即可轻松实现这一点。
Hard Deletes : 硬删除是一种更强形式,他从数据集中物理删除数据记录的任何痕迹。
现在让我们对数据集执行一些删除操作。
%%configure -f
{
"conf": {
"spark.jars":"hdfs:///hudi-spark-bundle.jar,hdfs:///spark-avro.jar",
"spark.serializer":"org.apache.spark.serializer.KryoSerializer",
"spark.sql.hive.convertMetastoreParquet":"false",
"spark.dynamicAllocation.executorIdleTimeout": 3600,
"spark.executor.memory": "7G",
"spark.executor.cores": 1,
"spark.dynamicAllocation.initialExecutors":16,
"spark.sql.parquet.outputTimestampType":"TIMESTAMP_MILLIS"
}
}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import com.google.common.collect.Lists;
import org.apache.hudi.ComplexKeyGenerator
import org.apache.spark.sql.functions.{concat, lit}
import org.apache.spark.sql.functions.{year, month, dayofmonth, hour}

让我们选择一些记录来测试软删除功能:
//Hudi Copy on Write Table
val s3_bucket="###s3_bucket###"
val hudiTableName = "sales_order_detail_hudi_cow"
val hudiTableRecordKey = "record_key"
val hudiTablePartitionKey = "partition_key"
val hudiTablePrecombineKey = "order_date"
val hudiTablePath = s"s3://$s3_bucket/hudi/" + hudiTableName
val hudiHiveTablePartitionKey = "year,month"
让我们为该练习选择一些随机的order_id:
val df=spark.sql("select order_id, quantity, order_date from "+hudiTableName+" where order_id in (10001,10002,10003)")
df.show(100,false)

import org.apache.spark.sql.types.IntegerType
val df=spark.sql("select * from "+hudiTableName+" where order_id in (10001,10002,10003)")
df.printSchema()
val updatedDF = df.withColumn("quantity", lit("-1").cast(IntegerType))
updatedDF.printSchema()

设置我们的Hudi数据源选项
val hudiOptions = Map[String,String](
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> hudiTableRecordKey,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> hudiTablePartitionKey,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> hudiTablePrecombineKey,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> hudiHiveTablePartitionKey,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> "false",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
classOf[MultiPartKeysValueExtractor].getName,
"hoodie.parquet.max.file.size" -> String.valueOf(128 * 1024 * 1024),
"hoodie.parquet.small.file.limit" -> String.valueOf(64 * 1024 * 1024),
"hoodie.parquet.compression.ratio" -> String.valueOf(0.5))
(
updatedDF.write
.format("org.apache.hudi")
//Copy on Write Table
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.options(hudiOptions)
.option(HoodieWriteConfig.TABLE_NAME,hudiTableName)
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudiTableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Append)
.save(hudiTablePath)
)

现在让我们查看表中更改的数据:
val df=spark.sql("select order_id, quantity, order_date from "+hudiTableName+" where order_id in (10001,10002,10003)")
df.show(100,false)

我们可以看到数量字段已更新。因此,从本质上讲,软删除是一种清除某些字段的更新。您通常会对PII或PHI列执行此操作以使记录匿名。
让我们测试硬删除功能:
val deleteDF=spark.sql("select * from "+hudiTableName+" where order_id in (10001,10002,10003)")
deleteDF.count()
(
deleteDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_STORAGE_TYPE_OPT_VAL)
.options(hudiOptions)
.option(HoodieWriteConfig.TABLE_NAME,hudiTableName)
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudiTableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
// Empty out the row with the EmptyHoodieRecordPayload
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
.mode(SaveMode.Append)
.save(hudiTablePath)
)
val df=spark.sql("select * from "+hudiTableName+" where order_id in (10001,10002,10003)")
df.count()
我们可以看到记录已从我们的数据湖中删除。