批量导出AWS所有LB负载均衡器详细配置至Excel表格

批量导出AWS所有LB负载均衡器详细配置至Excel表格

Deng YongJie's blog 39 2025-05-04

负载均衡器配置信息,直接使用Python脚本查询elbv2和elb的配置

import json
import csv
import os
import subprocess


# AWS凭证通过环境变量获取(避免硬编码)
# ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
# SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# REGION = 'ap-southeast-1'

# 设置访问密钥和区域
ACCESS_KEY = 'AKIARPNIxxxxxx'
SECRET_KEY = 'NYt5Qw2RSj0/a5lDih7xxxxxx'
REGION = 'ap-southeast-1'

def get_load_balancer_details(lb_arn, lb_type='elbv2'):
    if lb_type == 'elbv2':
        try:
            result = subprocess.run(
                [
                    'aws', 'elbv2', 'describe-load-balancers',
                    '--load-balancer-arns', lb_arn,
                    '--region', REGION,
                    '--output', 'json'
                ],
                env={
                    **os.environ,
                    'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                    'AWS_SECRET_ACCESS_KEY': SECRET_KEY
                },
                capture_output=True, text=True, check=True
            )
            return json.loads(result.stdout)['LoadBalancers'][0]
        except subprocess.CalledProcessError as e:
            print(f"Error fetching load balancer details for ARN {lb_arn}: {e}")
            return {}
    elif lb_type == 'elb':
        try:
            result = subprocess.run(
                [
                    'aws', 'elb', 'describe-load-balancers',
                    '--load-balancer-names', lb_arn,
                    '--region', REGION,
                    '--output', 'json'
                ],
                env={
                    **os.environ,
                    'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                    'AWS_SECRET_ACCESS_KEY': SECRET_KEY
                },
                capture_output=True, text=True, check=True
            )
            return json.loads(result.stdout)['LoadBalancerDescriptions'][0]
        except subprocess.CalledProcessError as e:
            print(f"Error fetching load balancer details for name {lb_arn}: {e}")
            return {}

def get_listeners(lb_arn):
    try:
        result = subprocess.run(
            [
                'aws', 'elbv2', 'describe-listeners',
                '--load-balancer-arn', lb_arn,
                '--region', REGION,
                '--output', 'json'
            ],
            env={
                **os.environ,
                'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                'AWS_SECRET_ACCESS_KEY': SECRET_KEY
            },
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)['Listeners']
    except subprocess.CalledProcessError as e:
        print(f"Error fetching listeners for load balancer ARN {lb_arn}: {e}")
        return []


def get_listeners_for_elb(lb_name):
    try:
        result = subprocess.run(
            [
                'aws','elb','describe-load-balancers',
                '--load-balancer-name',lb_name,
                '--region',REGION,
                '--output','json'
            ],
            env={
                **os.environ,
                'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                'AWS_SECRET_ACCESS_KEY': SECRET_KEY
            },
            capture_output=True,text=True,check=True
        )
        lb_detail = json.loads(result.stdout)['LoadBalancerDescriptions'][0]
        listeners = lb_detail.get('ListenerDescriptions',[])

        # 格式化侦听器信息
        listeners_str = '\n'.join([
            f"{listener['Listener']['Protocol']} {listener['Listener']['LoadBalancerPort']} -> "
            f"{listener['Listener'].get('InstanceProtocol','')} {listener['Listener'].get('InstancePort','')}"
            for listener in listeners
        ])
        return listeners_str
    except subprocess.CalledProcessError as e:
        print(f"Error fetching listeners for classic load balancer {lb_name}: {e}")
        return ''


def get_subnets(availability_zones, subnets, lb_type='elb'):
    if lb_type == 'elbv2':
        # 对于 ELBv2 类型,子网信息位于 AvailabilityZones 中的每个可用区对象里
        subnets_list = [az['SubnetId'] for az in availability_zones]
    else:
        # 对于经典 ELB 类型,子网信息直接是一个列表
        subnets_list = subnets
    return '\n'.join(subnets_list)


def get_target_groups(lb_arn):
    try:
        result = subprocess.run(
            [
                'aws', 'elbv2', 'describe-target-groups',
                '--load-balancer-arn', lb_arn,
                '--region', REGION,
                '--output', 'json'
            ],
            env={
                **os.environ,
                'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                'AWS_SECRET_ACCESS_KEY': SECRET_KEY
            },
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)['TargetGroups']
    except subprocess.CalledProcessError as e:
        print(f"Error fetching target groups for ARN {lb_arn}: {e}")
        return []

def get_health_check(tg_arn):
    try:
        result = subprocess.run(
            [
                'aws', 'elbv2', 'describe-target-health',
                '--target-group-arn', tg_arn,
                '--region', REGION,
                '--output', 'json'
            ],
            env={
                **os.environ,
                'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                'AWS_SECRET_ACCESS_KEY': SECRET_KEY
            },
            capture_output=True, text=True, check=True
        )
        return json.loads(result.stdout)
    except subprocess.CalledProcessError as e:
        print(f"Error fetching health check for target group {tg_arn}: {e}")
        return {}

def extract_listener_target_groups(listeners):
    target_groups = []
    for listener in listeners:
        actions = listener.get('DefaultActions', [])
        for action in actions:
            if action['Type'] == 'forward':
                target_group_arn = action['TargetGroupArn']
                # 从 ARN 中提取目标组名称
                parts = target_group_arn.split('/')
                if len(parts) > 1:
                    target_group_name = parts[-2]  # 目标组名称位于倒数第二个位置
                    target_groups.append(target_group_name)
                else:
                    print(f"无法从 ARN {target_group_arn} 中提取目标组名称")
    return '; '.join(target_groups)

def get_attributes(lb_name_or_arn, lb_type):
    if lb_type == 'elbv2':
        try:
            result = subprocess.run(
                [
                    'aws', 'elbv2', 'describe-load-balancer-attributes',
                    '--load-balancer-arn', lb_name_or_arn,
                    '--region', REGION,
                    '--output', 'json'
                ],
                env={
                    **os.environ,
                    'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                    'AWS_SECRET_ACCESS_KEY': SECRET_KEY
                },
                capture_output=True, text=True, check=True
            )
            attributes_list = json.loads(result.stdout)['Attributes']
            attributes_dict = {attr['Key']: attr['Value'] for attr in attributes_list}
            return attributes_dict
        except subprocess.CalledProcessError as e:
            print(f"Error fetching attributes for elbv2 {lb_name_or_arn}: {e}")
            return {}
    elif lb_type == 'elb':
        # 经典 ELB 需要使用 load-balancer-name 而不是 ARN
        try:
            result = subprocess.run(
                [
                    'aws', 'elb', 'describe-load-balancer-attributes',
                    '--load-balancer-name', lb_name_or_arn,
                    '--region', REGION,
                    '--output', 'json'
                ],
                env={
                    **os.environ,
                    'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                    'AWS_SECRET_ACCESS_KEY': SECRET_KEY
                },
                capture_output=True, text=True, check=True
            )
            attributes_dict = {}
            attributes_json = json.loads(result.stdout)
            for attr_key,attr_value in attributes_json['LoadBalancerAttributes'].items():
                if isinstance(attr_value,dict) and 'Enabled' in attr_value:
                    attributes_dict[attr_key] = attr_value['Enabled']
                else:
                    attributes_dict[attr_key] = attr_value
            return attributes_dict
        except subprocess.CalledProcessError as e:
            print(f"Error fetching attributes for classic elb {lb_name_or_arn}: {e}")
            return {}


def get_instance_ids_for_classic_lb(lb_name):
    try:
        result = subprocess.run(
            [
                'aws', 'elb', 'describe-instance-health',
                '--load-balancer-name', lb_name,
                '--region', REGION,
                '--output', 'json'
            ],
            env={
                **os.environ,
                'AWS_ACCESS_KEY_ID': ACCESS_KEY,
                'AWS_SECRET_ACCESS_KEY': SECRET_KEY
            },
            capture_output=True, text=True, check=True
        )
        instance_ids = [instance['InstanceId'] for instance in json.loads(result.stdout)['InstanceStates']]
        return '\n'.join(instance_ids)
    except subprocess.CalledProcessError as e:
        print(f"Error fetching instance IDs for classic load balancer {lb_name}: {e}")
        return ''


def get_health_check_info_for_classic_lb(lb_detail):
    health_check = lb_detail.get('HealthCheck', {})
    if health_check:
        return (
            f"HealthyThreshold: {health_check.get('HealthyThreshold')}\n"
            f"Interval: {health_check.get('Interval')}\n"
            f"Target: {health_check.get('Target')}\n"
            f"Timeout: {health_check.get('Timeout')}\n"
            f"UnhealthyThreshold: {health_check.get('UnhealthyThreshold')}"
        )
    else:
        return ''


# 获取所有负载均衡器基本信息
try:
    # 获取 ELBv2 类型的负载均衡器
    result_v2 = subprocess.run(
        [
            'aws', 'elbv2', 'describe-load-balancers',
            '--query', 'LoadBalancers[*].[LoadBalancerName, LoadBalancerArn]',
            '--region', REGION,
            '--output', 'json'
        ],
        env={
            **os.environ,
            'AWS_ACCESS_KEY_ID': ACCESS_KEY,
            'AWS_SECRET_ACCESS_KEY': SECRET_KEY
        },
        capture_output=True, text=True, check=True
    )
    load_balancers_v2 = json.loads(result_v2.stdout)

    # 获取经典负载均衡器(ELB)
    result_elb = subprocess.run(
        [
            'aws', 'elb', 'describe-load-balancers',
            '--query', 'LoadBalancerDescriptions[*].[LoadBalancerName]',
            '--region', REGION,
            '--output', 'json'
        ],
        env={
            **os.environ,
            'AWS_ACCESS_KEY_ID': ACCESS_KEY,
            'AWS_SECRET_ACCESS_KEY': SECRET_KEY
        },
        capture_output=True, text=True, check=True
    )
    load_balancers_elb = json.loads(result_elb.stdout)

except subprocess.CalledProcessError as e:
    print(f"Error fetching load balancers: {e}")
    exit(1)

# 定义CSV头部
fields = ['负载均衡器名称', 'DNS 名称', '负载均衡类型', '负载均衡模式', '安全组', '可用区', '子网', '侦听器', '健康检查', '目标实例', '跨区域负载均衡配置', '删除保护', '转发目标组']

# 准备写入CSV
with open('load_balancers_info.csv', 'w', newline=''') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(fields)

    # 遍历每个elbv2类型的负载均衡器并写入 CSV
    for lb in load_balancers_v2:
        lb_name = lb[0]
        lb_arn = lb[1]

        # 获取负载均衡器详细信息
        lb_detail = get_load_balancer_details(lb_arn, lb_type='elbv2')
        dns_name = lb_detail.get('DNSName', '')
        lb_type = lb_detail.get('Type', '')
        lb_scheme = lb_detail.get('Scheme', '')
        lb_securitygroups = '\n'.join(lb_detail.get('SecurityGroups', []))
        availability_zones = lb_detail.get('AvailabilityZones', [])

        # 获取子网信息
        subnets_str = get_subnets(availability_zones,[],lb_type='elbv2')

        # 获取侦听器信息
        listeners = get_listeners(lb_arn)
        listeners_str = '; '.join([f"{listener['Protocol']} {listener['Port']}" for listener in listeners])

        # 提取侦听器中的转发目标组
        target_groups_str = extract_listener_target_groups(listeners)

        # 获取目标组和健康检查信息
        target_groups = get_target_groups(lb_arn)
        health_checks = []

        for tg in target_groups:
            tg_arn = tg['TargetGroupArn']
            health_check_result = get_health_check(tg_arn)

            for thd in health_check_result.get('TargetHealthDescriptions', []):
                target_id = thd['Target']['Id']
                target_port = thd['Target']['Port']
                health_check_port = thd['HealthCheckPort']
                state = thd['TargetHealth']['State']
                reason = thd['TargetHealth'].get('Reason', '')
                description = thd['TargetHealth'].get('Description', '')

                health_check_info = (
                    f"Id: {target_id}\n"
                    f"HealthCheckPort: {health_check_port}\n"
                    f"Port: {target_port}\n"
                    f"State: {state}\n"
                    f"Reason: {reason}\n"
                    f"Description: {description}\n"
                )
                health_checks.append(health_check_info)

        health_check_str = '\n'.join(health_checks)  # 使用换行符分隔每条健康检查信息

        # 获取属性信息
        attributes = get_attributes(lb_arn,lb_type='elbv2')
        deletion_protection = attributes.get('deletion_protection.enabled','false')
        cross_zone_load_balancing = attributes.get('load_balancing.cross_zone.enabled','false')

        # 目标实例留空
        target_instances_str = 'Null'

        # 写入行
        writer.writerow([
            lb_name,  # 负载均衡器名称
            dns_name,  # DNS 名称
            lb_type,  # 负载均衡类型
            lb_scheme,  # 负载均衡模式
            lb_securitygroups,  # 安全组
            '\n'.join([az['ZoneName'] for az in availability_zones]),  # 可用区
            subnets_str,  # 子网
            listeners_str,  # 侦听器
            health_check_str,  # 健康检查
            target_instances_str,  # 目标实例(elbv2没有目标实例信息)
            cross_zone_load_balancing,  # 跨区域负载均衡配置
            deletion_protection,  # 删除保护
            target_groups_str  # 转发目标组
        ])

    # 遍历每个经典负载均衡器(ELB)并写入 CSV
    for lb in load_balancers_elb:
        lb_name = lb[0]

        # 获取负载均衡器详细信息
        lb_detail = get_load_balancer_details(lb_name, lb_type='elb')
        dns_name = lb_detail.get('DNSName', '')
        lb_type = 'classic'
        lb_scheme = lb_detail.get('Scheme', '')
        # 处理 SecurityGroups
        security_groups = lb_detail.get('SecurityGroups',[])
        if isinstance(security_groups,list):
            lb_securitygroups = '\n'.join([sg for sg in security_groups])
        else:
            lb_securitygroups = security_groups

        availability_zones = lb_detail.get('AvailabilityZones', [])

        # 获取子网信息
        subnets_str = get_subnets([], lb_detail.get('Subnets', []), lb_type='elb')

        # 获取侦听器信息
        listeners_str = get_listeners_for_elb(lb_name)
        # 获取目标实例信息
        target_instances_str = get_instance_ids_for_classic_lb(lb_name)

        # 获取健康检查信息
        health_check_str = get_health_check_info_for_classic_lb(lb_detail)

        # 获取属性信息
        attributes = get_attributes(lb_name,lb_type='elb')
        cross_zone_load_balancing = attributes.get('CrossZoneLoadBalancing','false')

        # 经典负载均衡器不支持删除保护、目标组概念,所以这些字段留空
        deletion_protection = 'Null'
        target_groups_str = 'Null'

        # 写入行
        writer.writerow([
            lb_name,  # 负载均衡器名称
            dns_name,  # DNS 名称
            lb_type,  # 负载均衡类型
            lb_scheme,  # 负载均衡模式
            lb_securitygroups,  # 安全组
            '\n'.join(availability_zones),  # 可用区
            subnets_str,  # 子网
            listeners_str,  # 侦听器
            health_check_str,  # 健康检查
            target_instances_str,  # 目标实例
            cross_zone_load_balancing,  # 跨区域负载均衡配置
            deletion_protection,  # 删除保护(经典负载均衡器不支持)
            target_groups_str  # 转发目标组
        ])

print("负载均衡器信息已成功提取并保存到 load_balancers_info.csv 文件中。")

最终展示结果:

image-20250314111431330