Execute Hive Script in AWS Elastic MapReduce (EMR)

Three ways we can execute Hive script in EMR,

  • EMR Cluster Console
  • PuTTy or some other SSL connector
  • Using own code (Python, Java, Ruby and .NET)

 

Below I have written a Hive script which will export data from DynamoDB to S3. So before run this script, you will have to create a DyanmoDB table and S3 bucket for export file.

CREATE EXTERNAL TABLE ddbmember (id bigint,name string,city string,state string,age bigint)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
TBLPROPERTIES ("dynamodb.table.name" = "memberinfo",
"dynamodb.column.mapping" = "id:id,name:name,city:city,state:state,age:age"); 
 
CREATE EXTERNAL TABLE s3member (id int,name string,city string,state string,age int)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
LOCATION 's3://test.emr/export/'; 
 
INSERT OVERWRITE TABLE s3member SELECT * 
FROM ddbmember;

drop table ddbmember;
drop table s3member; 

First, we have created an external table for DynamoDB table.  “Id” field data must have same data type with DynamoDB table hash key(numeric type). Then we have created an external table of export S3 bucket.  Finally initiate “INSERT OVERWRITE” instruction to export full DynamoDB table in S3 bucket.

 

The hive script file will have to upload in S3 bucket to continue next section instruction. Below I have described three way of Hive script implantations,

EMR Console

Follow below steps

  • Navigate to EMR console>Cluster List>Waiting EMR cluster
  • Create new Step.
  • Write S3 the script location
  • Create

The AWS will execute the script automatically and will notify progress in Cluster console.

 

PuTTY

Using PuTTy client we can connect to EMR instance directly and execute Hive script same as traditional database.   Below article describe how to configure putty,

http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/putty.html

Below article describe how to connect putty with EMR cluster Hive.

http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-connect-master-node-ssh.html

 

If I summarized the AWS developer notes, steps are below,

  • Using puttygen.exe, create private key from Key Pair .pem file. Please make sure that your EMR cluster has been created same Kay Pair file. Putty gen will create a .PPK file. Follow this Post for details.
  • Under Session tab, write EMR cluster master node URL. Add “hadoop@” prefix of the URL.
  • Under connection>SSH>Auth, load the .PPK file
  • Under Tunnels add below info,
    • Destination: Master node URL:8888
    • Port: 8157
  • After add tunnel, click Load
  • After connect with EMR, write Hive.
  • Execute hive script using Hive console.

 

.NET SDK

We can create an EMR Step by attaching script file using AWS .NET SDK. Below items are prerequisites,

  • AWS .NET SDK – Core and EMR
  • EMR cluster instance
  • S3 bucket for Script

Below are implementation steps,

  • Create a new EMR cluster only to execute this script or assign existing running EMR instance. You will have to collect Job Flow Id of running instance from EMR console>EMR Cluster List
  • Create a step
  • Attached script with Step
  • Wait for EMR Step execution completion
  • Terminate EMR cluster if require.

Below is .NET implementation,

public void RunHiveScriptStep(string activeWaitingJobFlowId, string scriptS3Location, bool isTerminateCluster)
{
 try
 {

 if (!string.IsNullOrEmpty(activeWaitingJobFlowId))
 {
 StepFactory stepFactory = new StepFactory(RegionEndpoint.EUWest1);
 StepConfig runHiveScript = new StepConfig()
 {
 Name = "Run Hive script",
 HadoopJarStep = stepFactory.NewRunHiveScriptStep(scriptS3Location),
 ActionOnFailure = "TERMINATE_JOB_FLOW"
 };
 AddJobFlowStepsRequest addHiveRequest = new AddJobFlowStepsRequest(activeWaitingJobFlowId, new List<StepConfig>() { runHiveScript });
 AddJobFlowStepsResponse addHiveResponse = EmrClient.AddJobFlowSteps(addHiveRequest);
 List<string> stepIds = addHiveResponse.StepIds;
 String hiveStepId = stepIds[0];

 DescribeStepRequest describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
 DescribeStepResponse describeHiveStepResult = EmrClient.DescribeStep(describeHiveStepRequest);
 Step hiveStep = describeHiveStepResult.Step;
 StepStatus hiveStepStatus = hiveStep.Status;
 string hiveStepState = hiveStepStatus.State.Value.ToLower();
 bool failedState = false;
 StepTimeline finalTimeline = null;
 while (hiveStepState != "completed")
 {
 describeHiveStepRequest = new DescribeStepRequest() { ClusterId = activeWaitingJobFlowId, StepId = hiveStepId };
 describeHiveStepResult = EmrClient.DescribeStep(describeHiveStepRequest);
 hiveStep = describeHiveStepResult.Step;
 hiveStepStatus = hiveStep.Status;
 hiveStepState = hiveStepStatus.State.Value.ToLower();
 finalTimeline = hiveStepStatus.Timeline;
 Console.WriteLine(string.Format("Current state of Hive script execution: {0}", hiveStepState));
 switch (hiveStepState)
 {
 case "pending":
 case "running":
 Thread.Sleep(10000);
 break;
 case "cancelled":
 case "failed":
 case "interrupted":
 failedState = true;
 break;
 }
 if (failedState)
 {
 break;
 }
 }
 if (finalTimeline != null)
 {
 Console.WriteLine(string.Format("Hive script step {0} created at {1}, started at {2}, finished at {3}"
 , hiveStepId, finalTimeline.CreationDateTime, finalTimeline.StartDateTime, finalTimeline.EndDateTime));
 }

 if (isTerminateCluster)
 {
 TerminateJobFlowsRequest terminateRequest =
 new TerminateJobFlowsRequest(new List<string> {activeWaitingJobFlowId});
 TerminateJobFlowsResponse terminateResponse = EmrClient.TerminateJobFlows(terminateRequest);
 }
 }
 else
 {
 Console.WriteLine("No valid job flow could be created.");
 }
 }
 catch (AmazonElasticMapReduceException emrException)
 {
 Console.WriteLine("Hive script execution step has failed.");
 Console.WriteLine("Amazon error code: {0}",
 string.IsNullOrEmpty(emrException.ErrorCode) ? "None" : emrException.ErrorCode);
 Console.WriteLine("Exception message: {0}", emrException.Message);
 }
}

Method “RunHiveScript” expect three parameters,

  • activeWaitingJobFlowId : Running instance Job flow id. You can collect this ID from EMR console
  • scriptS3Location: script file S3 location
  • isTerminateCluster: Terminate cluster after execution or not.

AWS has provided SDK for some other languages like Phython, Java, Ruby. You can impalement same thing with other programming languages.

Related Post: CREATE AND CONFIGURE AWS ELASTIC MAPREDUCE (EMR) CLUSTER

Reference: Using Amazon Elastic MapReduce with the AWS.NET API Part 4: Hive basics with Hadoop

…………………….

Khayer