我们将使用此笔记本来验证Aurora MySQL数据库是否已启动并正在运行。该数据库将用作交易数据源,我们将从该笔记本执行数据随机更新以便于模拟生产环境中不断传入的业务数据。

让我们首先测试与数据库的连接性: 让我们运行一些SQL语句。我们将使用以下辅助函数来执行SQL语句:
import MySQLdb,random,time
host = '###aurora_endpoint###'
user = 'master'
password = 'S3cretPwd99'
port = 3306
db = 'salesdb'
conn = MySQLdb.Connection(
host=host,
user=user,
passwd=password,
port=port,
db=db
)
def execute_sql(sql):
conn.query(sql)
result = conn.store_result()
for i in range(result.num_rows()):
print(result.fetch_row())
def execute_dml(sql):
conn.query(sql)
rowcount = conn.affected_rows()
print ("Rows updated: %d"%rowcount)
conn.commit()
execute_sql("show tables")
这是一个通用的SALES OLTP模式。在上面的表中,SALES_ORDER_DETAIL是我们将模拟更新的表。
让我们以SALES_ORDER_DETAIL表中的随机顺序执行一次更新。
# Example of how to update a random order.
order_id=random.randint(1,29000)
print ("Original Values: ")
execute_sql("SELECT ORDER_ID, LINE_NUMBER, QUANTITY FROM SALES_ORDER_DETAIL WHERE ORDER_ID = %d"%order_id)
execute_dml("UPDATE SALES_ORDER_DETAIL set QUANTITY = QUANTITY + 1 WHERE ORDER_ID = %d"%order_id)
print ("Updated Values: ")
execute_sql("SELECT ORDER_ID, LINE_NUMBER, QUANTITY FROM SALES_ORDER_DETAIL WHERE ORDER_ID = %d"%order_id)
以上代码的运行您将获得如下的返回

在Chrome中设置扩展名为“ FoxyProxy Standard”:
打开通往EMR Spark集群的SSL隧道
打开终端并运行以下命令以启动SSL隧道。
当询问是否将主机添加到受信任的主机时,请选择“是”。
该命令不应返回,表明隧道已打开。
现在您应该可以在Chrome打开 http://###emr_spark_cluster_master###:8088
仅适用于Windows用户, 请按照以下链接中的说明进行操作:
使用PuTTYgen转换私钥: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/putty.html
使用Putty启动到EMR主节点的SSL隧道 https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-ssh-tunnel.html#emr-ssh-tunnel-win
$> ssh -i ee-default-keypair.pem hadoop@###emr_spark_cluster_master### -ND 8157
在这一步中,我们将使用AWS DMS执行从该数据库到S3的全部数据加载

让我们对数据进行一些随机更新。我们将使用下面的帮助器功能来执行这些更新。
def perform_random_updates():
order_id=random.randint(1,29000)
execute_dml("UPDATE SALES_ORDER_DETAIL set QUANTITY = QUANTITY + 1 WHERE ORDER_ID = %d"%order_id)
while (True):
perform_random_updates()
以上代码的运行您将获得如下的返回

请在一段时间后停止执行上述单元格,以便停止源源不断的创造数据的更新。在本实验下面的环节我们将切换到其他笔记本。但是,请在Jupyter窗口中将此笔记本保持打开状态,因为稍后我们将再次使用它来模拟更多更新。