实验1-Aurora+DMS+MSK数据流

Aurora作为源数据通过DMS将全量和增量数据流动至MSK

实验目录:

概览

我们将使用此笔记本来验证Aurora MySQL数据库是否已启动并正在运行。该数据库将用作交易数据源,我们将从该笔记本执行数据随机更新以便于模拟生产环境中不断传入的业务数据。

Architecture

Aurora MySQL作为源数据库

让我们首先测试与数据库的连接性: 让我们运行一些SQL语句。我们将使用以下辅助函数来执行SQL语句:

import MySQLdb,random,time

host = '###aurora_endpoint###'
user = 'master'
password = 'S3cretPwd99'
port = 3306
db = 'salesdb'

conn = MySQLdb.Connection(
    host=host,
    user=user,
    passwd=password,
    port=port,
    db=db
)

def execute_sql(sql):
    conn.query(sql)
    result = conn.store_result()
    for i in range(result.num_rows()):
        print(result.fetch_row())
        
def execute_dml(sql):
    conn.query(sql)
    rowcount = conn.affected_rows()
    print ("Rows updated: %d"%rowcount)
    conn.commit()

execute_sql("show tables")

这是一个通用的SALES OLTP模式。在上面的表中,SALES_ORDER_DETAIL是我们将模拟更新的表。

让我们以SALES_ORDER_DETAIL表中的随机顺序执行一次更新。

# Example of how to update a random order.
order_id=random.randint(1,29000)
print ("Original Values: ")

execute_sql("SELECT ORDER_ID, LINE_NUMBER, QUANTITY FROM SALES_ORDER_DETAIL WHERE ORDER_ID = %d"%order_id)
execute_dml("UPDATE SALES_ORDER_DETAIL set QUANTITY = QUANTITY + 1 WHERE ORDER_ID = %d"%order_id)
print ("Updated Values: ")

execute_sql("SELECT ORDER_ID, LINE_NUMBER, QUANTITY FROM SALES_ORDER_DETAIL WHERE ORDER_ID = %d"%order_id)

以上代码的运行您将获得如下的返回

访问 EMR Web UI

  • 在Chrome中设置扩展名为“ FoxyProxy Standard”:

    • 使用Google Chrome安装和配置扩展名“ FoxyProxy Standard”(推荐)
    • 在本地下载此文件:Foxy代理配置
    • 转到Chrome中的更多工具->扩展程序(转到chrome:// extensions)
    • 确保已启用扩展名“ FoxyProxy标准”。
    • 单击扩展的“详细信息”,然后单击“扩展选项”。
    • 在“导入/导出”页面上,选择“选择文件”,浏览至您创建的foxyproxy-settings.xml文件的位置,选择该文件,然后选择“打开”。
    • 当提示您覆盖现有设置时,选择“替换”。
    • 对于代理模式,选择“基于代理的预定义模式和优先级使用代理”。
  • 打开通往EMR Spark集群的SSL隧道

$> ssh -i ee-default-keypair.pem  hadoop@###emr_spark_cluster_master### -ND 8157

AWS DMS 全量加载

在这一步中,我们将使用AWS DMS执行从该数据库到S3的全部数据加载

  • 通过单击服务-> DMS导航到DMS控制台。
  • 在DMS控制台的左侧面板中找到菜单项转换和迁移->数据库迁移任务。
  • 选择唯一的复制任务项,然后单击操作->重新启动/继续按钮以启动此任务。
  • 您可以通过单击任务链接来监视此任务的进度。

模拟随机更新

让我们对数据进行一些随机更新。我们将使用下面的帮助器功能来执行这些更新。

def perform_random_updates():
    order_id=random.randint(1,29000)
    execute_dml("UPDATE SALES_ORDER_DETAIL set QUANTITY = QUANTITY + 1 WHERE ORDER_ID = %d"%order_id)

while (True):
    perform_random_updates()

以上代码的运行您将获得如下的返回

请在一段时间后停止执行上述单元格,以便停止源源不断的创造数据的更新。在本实验下面的环节我们将切换到其他笔记本。但是,请在Jupyter窗口中将此笔记本保持打开状态,因为稍后我们将再次使用它来模拟更多更新。