Setup AWS “Instance Profile” for on-prem servers


I assume you already knew what the EC2 instance profile is. Basically, the instance profile defines the permissions that a EC instance has. As it is associated with an IAM role which has a bunch of IAM policies attached. And the AWS credential in the EC2 metadata is automatically rotated by the instance profile.

In a hybrid environment (e.g on-prem + AWS cloud), it is not uncommon to run some AWS related tasks from on-prem servers (e.g deploy Cloudformation stack from on-prem Bamboo server). How do you normally manage the AWS credentials on those on-prem servers? The common practice that I have seen is to use either IAM user’s credential or service account to assume IAM role (federated access). But it looks to me, neither is ideal. As there is still an overhead to manage the IAM user or the service account (e.g rotate the password). I was wondering if it is possible to setup the instance profile for the on-prem servers?

I worked out a solution when I was working on the Confluence and Jira AWS migration project. There are about 2T data needs to be migrated to AWS from on-prem, and I chose to use S3 sync (why not using AWS DataSync?). To run AWS S3 CLI on the on-prem servers, it requires AWS credentials. For security and ease of management overhead, I designed a solution that is able to automatically rotate the AWS credential on the on-prem server, and also does not impact the running job – Renew AWS credential for a long run AWS CLI process.

The idea is simple:

  • Setup a CodeBuild project in AWS which does two things: assume a least privilege role then pass the credential to the on-prem server via Ansible per schedule. e.g updating the content of the file ~/.aws/credential.
{
  "Version": 1,
  "AccessKeyId": "an AWS access key",
  "SecretAccessKey": "your AWS secret access key",
  "SessionToken": "the AWS session token for temporary credentials", 
  "Expiration": "ISO8601 timestamp when the credentials expire"
}  
  • Setup a dynamic AWS profile in the on-prem server. e.g ~/.aws/credential
[default]
credential_process = cat ~/.aws/temp.json

And the solution worked great! We migrated the 2T data without any issues. The use case of the pattern can be much wider. e.g Rotate the credential for on-prem Bamboo agents.

The above “Instance Profile” for on-prem servers solution is still not easy enough, e.g the port 22 has to be open between On-prem and the VPC to allow Ansible to ssh into the on-prem box.

Let’s name above version 1. I recently worked out the version 2, which is easier and more powerful by using Lambda, SSM managed instance and SSM Run Command. The idea is still the same, but the implementation is better – Use Lamba to assume the role, then pass it to the SSM managed instance (mi-* Setting up AWS Systems Manager for hybrid environments) via SSM run command. Obviously, the benefits are less management overhead, and it can apply to multiple instances based on tagging.

Here is the sample Lambda function.

import os
import json
import logging
import time
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
sts_client = boto3.client("sts")
ssm_client = boto3.client("ssm")
ROLE_ARN = os.environ['ROLE_ARN']
DOCUMENT_NAME = 'AWS-RunShellScript'
LOG_GROUP = 'ssm-rotate-on-prem-bamboo-credentials'
SESSION_NAME = 'rotate-on-prem-bamboo-credentials'
COMMAND_TARGET=[
{
'Key': 'tag:on-prem',
'Values': ['bamboo-agent']
},
]
def assume_role_for_credential(role_arn: str, session_name: str) -> dict:
"""Assume IAM role then return the credential"""
credential = {}
role = sts_client.assume_role(
RoleArn=role_arn,
RoleSessionName=session_name
)
credential['AccessKeyId']=role['Credentials']['AccessKeyId']
credential['SecretAccessKey']=role['Credentials']['SecretAccessKey']
credential['SessionToken']=role['Credentials']['SessionToken']
return credential
def generate_command_params() -> dict:
"""Form the params for run command"""
params = {
'workingDirectory': [''],
'executionTimeout': ['300'],
'commands': []
}
credential = assume_role_for_credential(ROLE_ARN, SESSION_NAME)
params['commands'].append("echo [default] > ~/.aws/credentials")
params['commands'].append(f"echo aws_access_key_id={credential['AccessKeyId']} >> ~/.aws/credentials")
params['commands'].append(f"echo aws_secret_access_key={credential['SecretAccessKey']} >> ~/.aws/credentials")
params['commands'].append(f"echo aws_session_token={credential['SessionToken']} >> ~/.aws/credentials")
print(params)
return params
def is_response_code_200(response: dict) -> bool:
"""Check if response code is 200"""
try:
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
logger.info('Response code is 200')
return True
else:
logger.error(f"Response code is {response['ResponseMetadata']['HTTPStatusCode']}")
return False
except Exception as e:
logger.error(f'Response error: {e}')
return False
def send_command(params: dict) -> str:
"""Send SSM run command"""
try:
response = ssm_client.send_command(
Targets=COMMAND_TARGET,
DocumentName=DOCUMENT_NAME,
Parameters=params,
CloudWatchOutputConfig={
'CloudWatchLogGroupName': LOG_GROUP,
'CloudWatchOutputEnabled': True
},
TimeoutSeconds=120
)
if is_response_code_200(response):
logger.info(f'Command is sent successfully: {response}')
return response['Command']['CommandId']
else:
logger.error(f'Command is sent, but response is not 200: {response}')
return None
except Exception as e:
logger.error(f'Command could not be sent: {e}')
return None
def is_command_invocation_success(command_id: str) -> bool:
"""Check if SSM run command invocation status is success"""
timewait = 1
while True:
response = ssm_client.list_command_invocations(
CommandId=command_id,
Details=False
)
if is_response_code_200(response):
if response['CommandInvocations']:
invocation_status = response['CommandInvocations'][0]['Status']
if invocation_status != 'Pending':
if invocation_status == 'InProgress' or invocation_status == 'Success':
logging.info(f'Command invocation status: {invocation_status}')
if invocation_status == 'Success':
return True
else:
logging.error(f'Command invocation failed: {response}')
return False
time.sleep(timewait)
timewait += timewait
def lambda_handler(event, context):
"""Lambda Handler"""
logging.info(json.dumps(event))
command_params = generate_command_params()
command_id = send_command(command_params)
is_command_invocation_success(command_id)
if __name__ == '__main__':
lambda_handler({}, {})

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s