负载均衡器配置信息,直接使用Python脚本查询elbv2和elb的配置
import json
import csv
import os
import subprocess
# AWS凭证通过环境变量获取(避免硬编码)
# ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
# SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# REGION = 'ap-southeast-1'
# 设置访问密钥和区域
ACCESS_KEY = 'AKIARPNIxxxxxx'
SECRET_KEY = 'NYt5Qw2RSj0/a5lDih7xxxxxx'
REGION = 'ap-southeast-1'
def get_load_balancer_details(lb_arn, lb_type='elbv2'):
if lb_type == 'elbv2':
try:
result = subprocess.run(
[
'aws', 'elbv2', 'describe-load-balancers',
'--load-balancer-arns', lb_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)['LoadBalancers'][0]
except subprocess.CalledProcessError as e:
print(f"Error fetching load balancer details for ARN {lb_arn}: {e}")
return {}
elif lb_type == 'elb':
try:
result = subprocess.run(
[
'aws', 'elb', 'describe-load-balancers',
'--load-balancer-names', lb_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)['LoadBalancerDescriptions'][0]
except subprocess.CalledProcessError as e:
print(f"Error fetching load balancer details for name {lb_arn}: {e}")
return {}
def get_listeners(lb_arn):
try:
result = subprocess.run(
[
'aws', 'elbv2', 'describe-listeners',
'--load-balancer-arn', lb_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)['Listeners']
except subprocess.CalledProcessError as e:
print(f"Error fetching listeners for load balancer ARN {lb_arn}: {e}")
return []
def get_listeners_for_elb(lb_name):
try:
result = subprocess.run(
[
'aws','elb','describe-load-balancers',
'--load-balancer-name',lb_name,
'--region',REGION,
'--output','json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True,text=True,check=True
)
lb_detail = json.loads(result.stdout)['LoadBalancerDescriptions'][0]
listeners = lb_detail.get('ListenerDescriptions',[])
# 格式化侦听器信息
listeners_str = '\n'.join([
f"{listener['Listener']['Protocol']} {listener['Listener']['LoadBalancerPort']} -> "
f"{listener['Listener'].get('InstanceProtocol','')} {listener['Listener'].get('InstancePort','')}"
for listener in listeners
])
return listeners_str
except subprocess.CalledProcessError as e:
print(f"Error fetching listeners for classic load balancer {lb_name}: {e}")
return ''
def get_subnets(availability_zones, subnets, lb_type='elb'):
if lb_type == 'elbv2':
# 对于 ELBv2 类型,子网信息位于 AvailabilityZones 中的每个可用区对象里
subnets_list = [az['SubnetId'] for az in availability_zones]
else:
# 对于经典 ELB 类型,子网信息直接是一个列表
subnets_list = subnets
return '\n'.join(subnets_list)
def get_target_groups(lb_arn):
try:
result = subprocess.run(
[
'aws', 'elbv2', 'describe-target-groups',
'--load-balancer-arn', lb_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)['TargetGroups']
except subprocess.CalledProcessError as e:
print(f"Error fetching target groups for ARN {lb_arn}: {e}")
return []
def get_health_check(tg_arn):
try:
result = subprocess.run(
[
'aws', 'elbv2', 'describe-target-health',
'--target-group-arn', tg_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
return json.loads(result.stdout)
except subprocess.CalledProcessError as e:
print(f"Error fetching health check for target group {tg_arn}: {e}")
return {}
def extract_listener_target_groups(listeners):
target_groups = []
for listener in listeners:
actions = listener.get('DefaultActions', [])
for action in actions:
if action['Type'] == 'forward':
target_group_arn = action['TargetGroupArn']
# 从 ARN 中提取目标组名称
parts = target_group_arn.split('/')
if len(parts) > 1:
target_group_name = parts[-2] # 目标组名称位于倒数第二个位置
target_groups.append(target_group_name)
else:
print(f"无法从 ARN {target_group_arn} 中提取目标组名称")
return '; '.join(target_groups)
def get_attributes(lb_name_or_arn, lb_type):
if lb_type == 'elbv2':
try:
result = subprocess.run(
[
'aws', 'elbv2', 'describe-load-balancer-attributes',
'--load-balancer-arn', lb_name_or_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
attributes_list = json.loads(result.stdout)['Attributes']
attributes_dict = {attr['Key']: attr['Value'] for attr in attributes_list}
return attributes_dict
except subprocess.CalledProcessError as e:
print(f"Error fetching attributes for elbv2 {lb_name_or_arn}: {e}")
return {}
elif lb_type == 'elb':
# 经典 ELB 需要使用 load-balancer-name 而不是 ARN
try:
result = subprocess.run(
[
'aws', 'elb', 'describe-load-balancer-attributes',
'--load-balancer-name', lb_name_or_arn,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
attributes_dict = {}
attributes_json = json.loads(result.stdout)
for attr_key,attr_value in attributes_json['LoadBalancerAttributes'].items():
if isinstance(attr_value,dict) and 'Enabled' in attr_value:
attributes_dict[attr_key] = attr_value['Enabled']
else:
attributes_dict[attr_key] = attr_value
return attributes_dict
except subprocess.CalledProcessError as e:
print(f"Error fetching attributes for classic elb {lb_name_or_arn}: {e}")
return {}
def get_instance_ids_for_classic_lb(lb_name):
try:
result = subprocess.run(
[
'aws', 'elb', 'describe-instance-health',
'--load-balancer-name', lb_name,
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
instance_ids = [instance['InstanceId'] for instance in json.loads(result.stdout)['InstanceStates']]
return '\n'.join(instance_ids)
except subprocess.CalledProcessError as e:
print(f"Error fetching instance IDs for classic load balancer {lb_name}: {e}")
return ''
def get_health_check_info_for_classic_lb(lb_detail):
health_check = lb_detail.get('HealthCheck', {})
if health_check:
return (
f"HealthyThreshold: {health_check.get('HealthyThreshold')}\n"
f"Interval: {health_check.get('Interval')}\n"
f"Target: {health_check.get('Target')}\n"
f"Timeout: {health_check.get('Timeout')}\n"
f"UnhealthyThreshold: {health_check.get('UnhealthyThreshold')}"
)
else:
return ''
# 获取所有负载均衡器基本信息
try:
# 获取 ELBv2 类型的负载均衡器
result_v2 = subprocess.run(
[
'aws', 'elbv2', 'describe-load-balancers',
'--query', 'LoadBalancers[*].[LoadBalancerName, LoadBalancerArn]',
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
load_balancers_v2 = json.loads(result_v2.stdout)
# 获取经典负载均衡器(ELB)
result_elb = subprocess.run(
[
'aws', 'elb', 'describe-load-balancers',
'--query', 'LoadBalancerDescriptions[*].[LoadBalancerName]',
'--region', REGION,
'--output', 'json'
],
env={
**os.environ,
'AWS_ACCESS_KEY_ID': ACCESS_KEY,
'AWS_SECRET_ACCESS_KEY': SECRET_KEY
},
capture_output=True, text=True, check=True
)
load_balancers_elb = json.loads(result_elb.stdout)
except subprocess.CalledProcessError as e:
print(f"Error fetching load balancers: {e}")
exit(1)
# 定义CSV头部
fields = ['负载均衡器名称', 'DNS 名称', '负载均衡类型', '负载均衡模式', '安全组', '可用区', '子网', '侦听器', '健康检查', '目标实例', '跨区域负载均衡配置', '删除保护', '转发目标组']
# 准备写入CSV
with open('load_balancers_info.csv', 'w', newline=''') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(fields)
# 遍历每个elbv2类型的负载均衡器并写入 CSV
for lb in load_balancers_v2:
lb_name = lb[0]
lb_arn = lb[1]
# 获取负载均衡器详细信息
lb_detail = get_load_balancer_details(lb_arn, lb_type='elbv2')
dns_name = lb_detail.get('DNSName', '')
lb_type = lb_detail.get('Type', '')
lb_scheme = lb_detail.get('Scheme', '')
lb_securitygroups = '\n'.join(lb_detail.get('SecurityGroups', []))
availability_zones = lb_detail.get('AvailabilityZones', [])
# 获取子网信息
subnets_str = get_subnets(availability_zones,[],lb_type='elbv2')
# 获取侦听器信息
listeners = get_listeners(lb_arn)
listeners_str = '; '.join([f"{listener['Protocol']} {listener['Port']}" for listener in listeners])
# 提取侦听器中的转发目标组
target_groups_str = extract_listener_target_groups(listeners)
# 获取目标组和健康检查信息
target_groups = get_target_groups(lb_arn)
health_checks = []
for tg in target_groups:
tg_arn = tg['TargetGroupArn']
health_check_result = get_health_check(tg_arn)
for thd in health_check_result.get('TargetHealthDescriptions', []):
target_id = thd['Target']['Id']
target_port = thd['Target']['Port']
health_check_port = thd['HealthCheckPort']
state = thd['TargetHealth']['State']
reason = thd['TargetHealth'].get('Reason', '')
description = thd['TargetHealth'].get('Description', '')
health_check_info = (
f"Id: {target_id}\n"
f"HealthCheckPort: {health_check_port}\n"
f"Port: {target_port}\n"
f"State: {state}\n"
f"Reason: {reason}\n"
f"Description: {description}\n"
)
health_checks.append(health_check_info)
health_check_str = '\n'.join(health_checks) # 使用换行符分隔每条健康检查信息
# 获取属性信息
attributes = get_attributes(lb_arn,lb_type='elbv2')
deletion_protection = attributes.get('deletion_protection.enabled','false')
cross_zone_load_balancing = attributes.get('load_balancing.cross_zone.enabled','false')
# 目标实例留空
target_instances_str = 'Null'
# 写入行
writer.writerow([
lb_name, # 负载均衡器名称
dns_name, # DNS 名称
lb_type, # 负载均衡类型
lb_scheme, # 负载均衡模式
lb_securitygroups, # 安全组
'\n'.join([az['ZoneName'] for az in availability_zones]), # 可用区
subnets_str, # 子网
listeners_str, # 侦听器
health_check_str, # 健康检查
target_instances_str, # 目标实例(elbv2没有目标实例信息)
cross_zone_load_balancing, # 跨区域负载均衡配置
deletion_protection, # 删除保护
target_groups_str # 转发目标组
])
# 遍历每个经典负载均衡器(ELB)并写入 CSV
for lb in load_balancers_elb:
lb_name = lb[0]
# 获取负载均衡器详细信息
lb_detail = get_load_balancer_details(lb_name, lb_type='elb')
dns_name = lb_detail.get('DNSName', '')
lb_type = 'classic'
lb_scheme = lb_detail.get('Scheme', '')
# 处理 SecurityGroups
security_groups = lb_detail.get('SecurityGroups',[])
if isinstance(security_groups,list):
lb_securitygroups = '\n'.join([sg for sg in security_groups])
else:
lb_securitygroups = security_groups
availability_zones = lb_detail.get('AvailabilityZones', [])
# 获取子网信息
subnets_str = get_subnets([], lb_detail.get('Subnets', []), lb_type='elb')
# 获取侦听器信息
listeners_str = get_listeners_for_elb(lb_name)
# 获取目标实例信息
target_instances_str = get_instance_ids_for_classic_lb(lb_name)
# 获取健康检查信息
health_check_str = get_health_check_info_for_classic_lb(lb_detail)
# 获取属性信息
attributes = get_attributes(lb_name,lb_type='elb')
cross_zone_load_balancing = attributes.get('CrossZoneLoadBalancing','false')
# 经典负载均衡器不支持删除保护、目标组概念,所以这些字段留空
deletion_protection = 'Null'
target_groups_str = 'Null'
# 写入行
writer.writerow([
lb_name, # 负载均衡器名称
dns_name, # DNS 名称
lb_type, # 负载均衡类型
lb_scheme, # 负载均衡模式
lb_securitygroups, # 安全组
'\n'.join(availability_zones), # 可用区
subnets_str, # 子网
listeners_str, # 侦听器
health_check_str, # 健康检查
target_instances_str, # 目标实例
cross_zone_load_balancing, # 跨区域负载均衡配置
deletion_protection, # 删除保护(经典负载均衡器不支持)
target_groups_str # 转发目标组
])
print("负载均衡器信息已成功提取并保存到 load_balancers_info.csv 文件中。")
最终展示结果:
