DevOps

Secrets Management: Comparing Vault, AWS KMS and Other Tools

Secrets Management comparing Vault, AWS KMS and other tools

Introduction

Managing secrets securely is one of the most critical aspects of modern DevOps and application security. Secrets include API keys, passwords, database credentials, encryption keys, certificates, and tokens. Hardcoding them in source code or storing them in plain-text configuration files creates severe security vulnerabilities that can lead to data breaches, unauthorized access, and compliance failures. That’s why secrets management tools like HashiCorp Vault, AWS KMS, Azure Key Vault, and others exist—to store, distribute, and rotate sensitive data securely throughout the application lifecycle.

In this comprehensive guide, we’ll explore what these tools do, how they differ in architecture and capabilities, provide practical implementation examples, and help you choose the right solution for your infrastructure.

What Is Secrets Management?

Secrets management is the process of securely storing, controlling access to, rotating, and auditing sensitive credentials across applications, CI/CD pipelines, and infrastructure. Instead of embedding secrets directly in code or configuration files, they are stored in an encrypted vault and retrieved programmatically when needed.

The Problem with Traditional Approaches

Before examining solutions, let’s understand why traditional secret handling fails:

# DON'T DO THIS - Secrets in source code
class DatabaseConfig:
    HOST = "production-db.example.com"
    USERNAME = "admin"
    PASSWORD = "super_secret_password_123"  # Exposed in git history forever
    
# DON'T DO THIS - Secrets in environment files committed to repo
# .env file
DATABASE_URL=postgres://admin:secret@db.example.com/production
AWS_SECRET_KEY=AKIAIOSFODNN7EXAMPLE
STRIPE_SECRET_KEY=sk_live_4eC39HqLyjWDarjtT1zdp7dc

These approaches create multiple security issues: secrets appear in version control history, they’re difficult to rotate, there’s no access auditing, and a single repository breach exposes all credentials.

Why Proper Secrets Management Matters

  • Security: Prevents unauthorized access and eliminates secret sprawl across repositories and systems.
  • Rotation: Enables automatic credential rotation without application downtime or code changes.
  • Auditing: Provides complete audit trails of who accessed what secrets and when.
  • Scalability: Centralizes credentials for consistent management across hundreds of services.
  • Compliance: Meets data protection standards like ISO 27001, SOC 2, GDPR, HIPAA, and PCI-DSS.

HashiCorp Vault: The Industry Standard

HashiCorp Vault is the most powerful and flexible secrets management solution available. It provides encryption, dynamic secrets, fine-grained access policies, and comprehensive auditing in a single platform that works across any cloud or on-premises environment.

Vault Architecture

Vault uses a client-server architecture with multiple components:

# docker-compose.yml - Vault Development Setup
version: '3.8'

services:
  vault:
    image: hashicorp/vault:1.15
    container_name: vault
    ports:
      - "8200:8200"
    environment:
      VAULT_DEV_ROOT_TOKEN_ID: "dev-root-token"
      VAULT_DEV_LISTEN_ADDRESS: "0.0.0.0:8200"
    cap_add:
      - IPC_LOCK
    volumes:
      - vault-data:/vault/data
      - ./config:/vault/config
    command: server -dev

  vault-init:
    image: hashicorp/vault:1.15
    depends_on:
      - vault
    environment:
      VAULT_ADDR: "http://vault:8200"
      VAULT_TOKEN: "dev-root-token"
    volumes:
      - ./scripts:/scripts
    entrypoint: /scripts/init-vault.sh

volumes:
  vault-data:

Vault Production Configuration

# vault-config.hcl - Production Configuration
storage "consul" {
  address = "consul.service.consul:8500"
  path    = "vault/"
  token   = "consul-acl-token"
}

listener "tcp" {
  address         = "0.0.0.0:8200"
  tls_cert_file   = "/vault/certs/vault.crt"
  tls_key_file    = "/vault/certs/vault.key"
  tls_min_version = "tls12"
}

api_addr = "https://vault.example.com:8200"
cluster_addr = "https://vault.example.com:8201"

ui = true

seal "awskms" {
  region     = "us-east-1"
  kms_key_id = "alias/vault-unseal-key"
}

telemetry {
  prometheus_retention_time = "30s"
  disable_hostname          = true
}

Enabling Secrets Engines

#!/bin/bash
# init-vault.sh - Initialize Vault with Secrets Engines

export VAULT_ADDR="http://vault:8200"
export VAULT_TOKEN="dev-root-token"

# Wait for Vault to be ready
until vault status; do
  echo "Waiting for Vault..."
  sleep 2
done

# Enable KV secrets engine v2
vault secrets enable -path=secret kv-v2

# Enable database secrets engine for dynamic credentials
vault secrets enable database

# Enable AWS secrets engine
vault secrets enable aws

# Enable PKI secrets engine for certificates
vault secrets enable pki

# Configure maximum lease TTL for PKI
vault secrets tune -max-lease-ttl=87600h pki

echo "Vault initialization complete"

Dynamic Database Credentials

One of Vault’s most powerful features is generating dynamic, short-lived database credentials:

# Configure PostgreSQL connection
vault write database/config/postgresql \
    plugin_name=postgresql-database-plugin \
    allowed_roles="app-readonly,app-readwrite" \
    connection_url="postgresql://{{username}}:{{password}}@db.example.com:5432/production?sslmode=require" \
    username="vault-admin" \
    password="vault-admin-password"

# Create a role for read-only access
vault write database/roles/app-readonly \
    db_name=postgresql \
    creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; \
        GRANT SELECT ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
    default_ttl="1h" \
    max_ttl="24h"

# Create a role for read-write access
vault write database/roles/app-readwrite \
    db_name=postgresql \
    creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}'; \
        GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO \"{{name}}\";" \
    default_ttl="1h" \
    max_ttl="24h"

# Request dynamic credentials
vault read database/creds/app-readonly
# Returns temporary username/password that auto-expires

Vault Policies for Access Control

# app-policy.hcl - Application Policy
path "secret/data/{{identity.entity.name}}/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

path "secret/data/shared/*" {
  capabilities = ["read", "list"]
}

path "database/creds/app-readonly" {
  capabilities = ["read"]
}

path "aws/creds/s3-readonly" {
  capabilities = ["read"]
}

# Deny access to admin paths
path "sys/*" {
  capabilities = ["deny"]
}

# Allow token renewal
path "auth/token/renew-self" {
  capabilities = ["update"]
}

Vault Client Integration in Applications

# vault_client.py - Python Vault Integration
import hvac
import os
from functools import lru_cache
from typing import Optional, Dict, Any
import logging

logger = logging.getLogger(__name__)

class VaultClient:
    """Secure Vault client with automatic token renewal."""
    
    def __init__(
        self,
        url: str = None,
        token: str = None,
        role_id: str = None,
        secret_id: str = None
    ):
        self.url = url or os.getenv('VAULT_ADDR', 'http://localhost:8200')
        self.client = hvac.Client(url=self.url)
        
        # Authenticate using AppRole (recommended for applications)
        if role_id and secret_id:
            self._authenticate_approle(role_id, secret_id)
        elif token:
            self.client.token = token
        else:
            # Try to use Kubernetes auth in K8s environments
            self._authenticate_kubernetes()
    
    def _authenticate_approle(self, role_id: str, secret_id: str):
        """Authenticate using AppRole method."""
        response = self.client.auth.approle.login(
            role_id=role_id,
            secret_id=secret_id
        )
        self.client.token = response['auth']['client_token']
        logger.info("Authenticated with Vault using AppRole")
    
    def _authenticate_kubernetes(self):
        """Authenticate using Kubernetes service account."""
        jwt_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
        if os.path.exists(jwt_path):
            with open(jwt_path, 'r') as f:
                jwt = f.read()
            
            role = os.getenv('VAULT_K8S_ROLE', 'app')
            response = self.client.auth.kubernetes.login(
                role=role,
                jwt=jwt
            )
            self.client.token = response['auth']['client_token']
            logger.info(f"Authenticated with Vault using Kubernetes auth")
    
    def get_secret(self, path: str, mount_point: str = 'secret') -> Dict[str, Any]:
        """Retrieve a secret from KV v2 engine."""
        try:
            response = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point=mount_point
            )
            return response['data']['data']
        except hvac.exceptions.InvalidPath:
            logger.error(f"Secret not found at path: {path}")
            raise
    
    def get_database_credentials(self, role: str) -> Dict[str, str]:
        """Get dynamic database credentials."""
        response = self.client.secrets.database.generate_credentials(
            name=role
        )
        return {
            'username': response['data']['username'],
            'password': response['data']['password'],
            'lease_id': response['lease_id'],
            'lease_duration': response['lease_duration']
        }
    
    def get_aws_credentials(self, role: str) -> Dict[str, str]:
        """Get dynamic AWS credentials."""
        response = self.client.secrets.aws.generate_credentials(
            name=role
        )
        return {
            'access_key': response['data']['access_key'],
            'secret_key': response['data']['secret_key'],
            'security_token': response['data'].get('security_token'),
            'lease_duration': response['lease_duration']
        }
    
    def renew_lease(self, lease_id: str, increment: int = 3600):
        """Renew a lease to extend credential lifetime."""
        return self.client.sys.renew_lease(
            lease_id=lease_id,
            increment=increment
        )


# Usage in application
vault = VaultClient(
    role_id=os.getenv('VAULT_ROLE_ID'),
    secret_id=os.getenv('VAULT_SECRET_ID')
)

# Get static secrets
db_config = vault.get_secret('myapp/database')
api_keys = vault.get_secret('myapp/api-keys')

# Get dynamic database credentials
db_creds = vault.get_database_credentials('app-readonly')
print(f"Temporary DB user: {db_creds['username']}")

Vault Agent Sidecar for Kubernetes

# deployment-with-vault-agent.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: myapp
spec:
  replicas: 3
  selector:
    matchLabels:
      app: myapp
  template:
    metadata:
      labels:
        app: myapp
      annotations:
        vault.hashicorp.com/agent-inject: "true"
        vault.hashicorp.com/role: "myapp"
        vault.hashicorp.com/agent-inject-secret-database.txt: "secret/data/myapp/database"
        vault.hashicorp.com/agent-inject-template-database.txt: |
          {{- with secret "secret/data/myapp/database" -}}
          DATABASE_URL=postgresql://{{ .Data.data.username }}:{{ .Data.data.password }}@{{ .Data.data.host }}:5432/{{ .Data.data.database }}
          {{- end -}}
        vault.hashicorp.com/agent-inject-secret-api-keys.json: "secret/data/myapp/api-keys"
        vault.hashicorp.com/agent-inject-template-api-keys.json: |
          {{- with secret "secret/data/myapp/api-keys" -}}
          {{ .Data.data | toJSON }}
          {{- end -}}
    spec:
      serviceAccountName: myapp
      containers:
        - name: myapp
          image: myapp:latest
          env:
            - name: DATABASE_CONFIG_FILE
              value: "/vault/secrets/database.txt"
            - name: API_KEYS_FILE
              value: "/vault/secrets/api-keys.json"
          volumeMounts:
            - name: vault-secrets
              mountPath: /vault/secrets
              readOnly: true

AWS KMS and Secrets Manager

AWS KMS (Key Management Service) provides managed encryption key lifecycle management, while AWS Secrets Manager handles secret storage and rotation. Together, they form AWS’s complete secrets management solution.

AWS KMS Key Creation

# aws_kms_setup.py - KMS Key Management
import boto3
import json

kms_client = boto3.client('kms', region_name='us-east-1')

def create_kms_key(alias: str, description: str) -> str:
    """Create a customer-managed KMS key with proper policy."""
    
    # Key policy allowing admin and application access
    key_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Enable IAM User Permissions",
                "Effect": "Allow",
                "Principal": {
                    "AWS": f"arn:aws:iam::{get_account_id()}:root"
                },
                "Action": "kms:*",
                "Resource": "*"
            },
            {
                "Sid": "Allow Application Role",
                "Effect": "Allow",
                "Principal": {
                    "AWS": f"arn:aws:iam::{get_account_id()}:role/ApplicationRole"
                },
                "Action": [
                    "kms:Encrypt",
                    "kms:Decrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": "*"
            }
        ]
    }
    
    response = kms_client.create_key(
        Description=description,
        KeyUsage='ENCRYPT_DECRYPT',
        KeySpec='SYMMETRIC_DEFAULT',
        Policy=json.dumps(key_policy),
        Tags=[
            {'TagKey': 'Environment', 'TagValue': 'production'},
            {'TagKey': 'Application', 'TagValue': 'myapp'}
        ]
    )
    
    key_id = response['KeyMetadata']['KeyId']
    
    # Create alias for easier reference
    kms_client.create_alias(
        AliasName=f'alias/{alias}',
        TargetKeyId=key_id
    )
    
    # Enable automatic key rotation
    kms_client.enable_key_rotation(KeyId=key_id)
    
    return key_id


def encrypt_with_kms(key_alias: str, plaintext: str) -> bytes:
    """Encrypt data using KMS."""
    response = kms_client.encrypt(
        KeyId=f'alias/{key_alias}',
        Plaintext=plaintext.encode('utf-8'),
        EncryptionContext={
            'purpose': 'application-secrets'
        }
    )
    return response['CiphertextBlob']


def decrypt_with_kms(ciphertext: bytes) -> str:
    """Decrypt data using KMS."""
    response = kms_client.decrypt(
        CiphertextBlob=ciphertext,
        EncryptionContext={
            'purpose': 'application-secrets'
        }
    )
    return response['Plaintext'].decode('utf-8')


def get_account_id() -> str:
    """Get current AWS account ID."""
    sts = boto3.client('sts')
    return sts.get_caller_identity()['Account']

AWS Secrets Manager Integration

# secrets_manager_client.py - AWS Secrets Manager Integration
import boto3
import json
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError
import logging

logger = logging.getLogger(__name__)

class AWSSecretsManager:
    """AWS Secrets Manager client with caching and rotation support."""
    
    def __init__(self, region: str = 'us-east-1'):
        self.client = boto3.client('secretsmanager', region_name=region)
        self._cache: Dict[str, Any] = {}
    
    def create_secret(
        self,
        name: str,
        secret_value: Dict[str, Any],
        description: str = '',
        kms_key_id: str = None
    ) -> str:
        """Create a new secret in Secrets Manager."""
        params = {
            'Name': name,
            'Description': description,
            'SecretString': json.dumps(secret_value),
            'Tags': [
                {'Key': 'ManagedBy', 'Value': 'application'},
                {'Key': 'Environment', 'Value': 'production'}
            ]
        }
        
        if kms_key_id:
            params['KmsKeyId'] = kms_key_id
        
        response = self.client.create_secret(**params)
        return response['ARN']
    
    def get_secret(self, secret_name: str, use_cache: bool = True) -> Dict[str, Any]:
        """Retrieve a secret value."""
        if use_cache and secret_name in self._cache:
            return self._cache[secret_name]
        
        try:
            response = self.client.get_secret_value(SecretId=secret_name)
            
            if 'SecretString' in response:
                secret = json.loads(response['SecretString'])
            else:
                # Handle binary secrets
                secret = response['SecretBinary']
            
            self._cache[secret_name] = secret
            return secret
            
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'ResourceNotFoundException':
                logger.error(f"Secret {secret_name} not found")
            elif error_code == 'DecryptionFailureException':
                logger.error(f"Cannot decrypt secret {secret_name}")
            raise
    
    def rotate_secret(self, secret_name: str, rotation_lambda_arn: str):
        """Enable automatic rotation for a secret."""
        self.client.rotate_secret(
            SecretId=secret_name,
            RotationLambdaARN=rotation_lambda_arn,
            RotationRules={
                'AutomaticallyAfterDays': 30,
                'ScheduleExpression': 'rate(30 days)'
            }
        )
    
    def update_secret(self, secret_name: str, secret_value: Dict[str, Any]):
        """Update an existing secret."""
        self.client.update_secret(
            SecretId=secret_name,
            SecretString=json.dumps(secret_value)
        )
        # Invalidate cache
        self._cache.pop(secret_name, None)
    
    def clear_cache(self):
        """Clear the local secret cache."""
        self._cache.clear()


# Usage example
secrets_manager = AWSSecretsManager()

# Create a database secret
secrets_manager.create_secret(
    name='prod/myapp/database',
    secret_value={
        'host': 'db.example.com',
        'port': 5432,
        'username': 'app_user',
        'password': 'secure_password_here',
        'database': 'production'
    },
    kms_key_id='alias/myapp-secrets-key'
)

# Retrieve secret in application
db_secret = secrets_manager.get_secret('prod/myapp/database')
connection_string = f"postgresql://{db_secret['username']}:{db_secret['password']}@{db_secret['host']}:{db_secret['port']}/{db_secret['database']}"

Secrets Rotation Lambda

# rotation_lambda.py - Automatic Secret Rotation
import boto3
import json
import logging
import psycopg2
from typing import Dict

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event: Dict, context) -> None:
    """Handle secret rotation lifecycle events."""
    secret_id = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']
    
    secrets_client = boto3.client('secretsmanager')
    
    if step == 'createSecret':
        create_secret(secrets_client, secret_id, token)
    elif step == 'setSecret':
        set_secret(secrets_client, secret_id, token)
    elif step == 'testSecret':
        test_secret(secrets_client, secret_id, token)
    elif step == 'finishSecret':
        finish_secret(secrets_client, secret_id, token)


def create_secret(client, secret_id: str, token: str) -> None:
    """Create a new version of the secret."""
    # Get current secret
    current = client.get_secret_value(
        SecretId=secret_id,
        VersionStage='AWSCURRENT'
    )
    current_secret = json.loads(current['SecretString'])
    
    # Generate new password
    import secrets
    import string
    alphabet = string.ascii_letters + string.digits + "!@#$%^&*()"
    new_password = ''.join(secrets.choice(alphabet) for _ in range(32))
    
    # Create new secret version
    new_secret = current_secret.copy()
    new_secret['password'] = new_password
    
    client.put_secret_value(
        SecretId=secret_id,
        ClientRequestToken=token,
        SecretString=json.dumps(new_secret),
        VersionStages=['AWSPENDING']
    )


def set_secret(client, secret_id: str, token: str) -> None:
    """Set the new secret in the database."""
    # Get pending and current secrets
    pending = client.get_secret_value(
        SecretId=secret_id,
        VersionId=token,
        VersionStage='AWSPENDING'
    )
    pending_secret = json.loads(pending['SecretString'])
    
    current = client.get_secret_value(
        SecretId=secret_id,
        VersionStage='AWSCURRENT'
    )
    current_secret = json.loads(current['SecretString'])
    
    # Connect to database and change password
    conn = psycopg2.connect(
        host=current_secret['host'],
        port=current_secret['port'],
        user=current_secret['username'],
        password=current_secret['password'],
        database=current_secret['database']
    )
    
    with conn.cursor() as cursor:
        cursor.execute(
            f"ALTER USER {current_secret['username']} WITH PASSWORD %s",
            (pending_secret['password'],)
        )
    conn.commit()
    conn.close()


def test_secret(client, secret_id: str, token: str) -> None:
    """Test the new secret works."""
    pending = client.get_secret_value(
        SecretId=secret_id,
        VersionId=token,
        VersionStage='AWSPENDING'
    )
    pending_secret = json.loads(pending['SecretString'])
    
    # Test connection with new password
    conn = psycopg2.connect(
        host=pending_secret['host'],
        port=pending_secret['port'],
        user=pending_secret['username'],
        password=pending_secret['password'],
        database=pending_secret['database']
    )
    conn.close()
    logger.info("New secret validated successfully")


def finish_secret(client, secret_id: str, token: str) -> None:
    """Finish the rotation by updating version stages."""
    metadata = client.describe_secret(SecretId=secret_id)
    
    # Find current version
    current_version = None
    for version, stages in metadata['VersionIdsToStages'].items():
        if 'AWSCURRENT' in stages and version != token:
            current_version = version
            break
    
    # Move AWSCURRENT to the new version
    client.update_secret_version_stage(
        SecretId=secret_id,
        VersionStage='AWSCURRENT',
        MoveToVersionId=token,
        RemoveFromVersionId=current_version
    )
    
    logger.info(f"Secret rotation completed for {secret_id}")

Azure Key Vault

Azure Key Vault is Microsoft’s managed secrets service that integrates seamlessly with Azure services and Active Directory.

# azure_key_vault_client.py - Azure Key Vault Integration
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.keyvault.secrets import SecretClient
from azure.keyvault.keys import KeyClient
from azure.keyvault.keys.crypto import CryptographyClient, EncryptionAlgorithm
from typing import Dict, Any, Optional
import json
import logging

logger = logging.getLogger(__name__)

class AzureKeyVaultClient:
    """Azure Key Vault client for secrets and key management."""
    
    def __init__(self, vault_url: str, use_managed_identity: bool = False):
        self.vault_url = vault_url
        
        if use_managed_identity:
            credential = ManagedIdentityCredential()
        else:
            credential = DefaultAzureCredential()
        
        self.secret_client = SecretClient(
            vault_url=vault_url,
            credential=credential
        )
        self.key_client = KeyClient(
            vault_url=vault_url,
            credential=credential
        )
        self._credential = credential
    
    def set_secret(self, name: str, value: str, **kwargs) -> str:
        """Store a secret in Key Vault."""
        secret = self.secret_client.set_secret(
            name=name,
            value=value,
            content_type=kwargs.get('content_type', 'application/json'),
            tags=kwargs.get('tags', {})
        )
        return secret.id
    
    def get_secret(self, name: str, version: str = None) -> str:
        """Retrieve a secret from Key Vault."""
        secret = self.secret_client.get_secret(name, version)
        return secret.value
    
    def get_secret_json(self, name: str) -> Dict[str, Any]:
        """Retrieve and parse a JSON secret."""
        value = self.get_secret(name)
        return json.loads(value)
    
    def delete_secret(self, name: str) -> None:
        """Delete a secret (soft delete if enabled)."""
        poller = self.secret_client.begin_delete_secret(name)
        poller.result()
    
    def list_secrets(self) -> list:
        """List all secrets in the vault."""
        return [s.name for s in self.secret_client.list_properties_of_secrets()]
    
    def encrypt_data(self, key_name: str, plaintext: bytes) -> bytes:
        """Encrypt data using a Key Vault key."""
        key = self.key_client.get_key(key_name)
        crypto_client = CryptographyClient(
            key,
            credential=self._credential
        )
        result = crypto_client.encrypt(
            EncryptionAlgorithm.rsa_oaep_256,
            plaintext
        )
        return result.ciphertext
    
    def decrypt_data(self, key_name: str, ciphertext: bytes) -> bytes:
        """Decrypt data using a Key Vault key."""
        key = self.key_client.get_key(key_name)
        crypto_client = CryptographyClient(
            key,
            credential=self._credential
        )
        result = crypto_client.decrypt(
            EncryptionAlgorithm.rsa_oaep_256,
            ciphertext
        )
        return result.plaintext


# Usage in Azure App Service or AKS
vault_client = AzureKeyVaultClient(
    vault_url="https://myapp-vault.vault.azure.net/",
    use_managed_identity=True
)

# Store secrets
vault_client.set_secret(
    name="database-connection",
    value=json.dumps({
        "host": "mydb.postgres.database.azure.com",
        "username": "app_user",
        "password": "secure_password"
    }),
    tags={"environment": "production"}
)

# Retrieve secrets
db_config = vault_client.get_secret_json("database-connection")

SOPS for GitOps Secrets

SOPS (Secret Operations) by Mozilla encrypts secrets within Git repositories, enabling GitOps workflows while keeping sensitive data secure.

# .sops.yaml - SOPS Configuration
creation_rules:
  # Production secrets encrypted with AWS KMS
  - path_regex: secrets/prod/.*\.yaml$
    kms: arn:aws:kms:us-east-1:123456789:key/mrk-production-key
    
  # Staging secrets encrypted with different key
  - path_regex: secrets/staging/.*\.yaml$
    kms: arn:aws:kms:us-east-1:123456789:key/mrk-staging-key
    
  # Development secrets with age encryption (local dev)
  - path_regex: secrets/dev/.*\.yaml$
    age: age1ql3z7hjy54pw3hyww5ayyfg7zqgvc7w3j2elw8zmrj2kg5sfn9aqmcac8p
# secrets/prod/database.yaml - Encrypted with SOPS
# Run: sops secrets/prod/database.yaml to edit
database:
    host: ENC[AES256_GCM,data:cHJvZC1kYi5leGFtcGxlLmNvbQ==,iv:...,tag:...]
    username: ENC[AES256_GCM,data:YXBwX3VzZXI=,iv:...,tag:...]
    password: ENC[AES256_GCM,data:c3VwZXJfc2VjcmV0X3Bhc3N3b3Jk,iv:...,tag:...]
    port: 5432  # Non-sensitive data can remain unencrypted
sops:
    kms:
        - arn: arn:aws:kms:us-east-1:123456789:key/mrk-production-key
          created_at: "2024-01-15T10:30:00Z"
          enc: AQICAHh...
    lastmodified: "2024-01-15T10:30:00Z"
    version: 3.8.1
# SOPS CLI Operations

# Create new encrypted file
sops secrets/prod/api-keys.yaml

# Edit existing encrypted file (decrypts in memory)
sops secrets/prod/database.yaml

# Decrypt to stdout
sops -d secrets/prod/database.yaml

# Encrypt existing plaintext file
sops -e secrets/prod/plaintext.yaml > secrets/prod/encrypted.yaml

# Rotate encryption keys
sops rotate -i secrets/prod/database.yaml

# Use in Kubernetes with kustomize
kustomize build --enable-alpha-plugins . | kubectl apply -f -
# sops_integration.py - Load SOPS secrets in Python
import subprocess
import yaml
import json
from typing import Dict, Any

def load_sops_secret(file_path: str) -> Dict[str, Any]:
    """Decrypt and load a SOPS-encrypted file."""
    result = subprocess.run(
        ['sops', '-d', file_path],
        capture_output=True,
        text=True,
        check=True
    )
    
    if file_path.endswith('.json'):
        return json.loads(result.stdout)
    else:
        return yaml.safe_load(result.stdout)

# Usage
db_secrets = load_sops_secret('secrets/prod/database.yaml')
print(f"Database host: {db_secrets['database']['host']}")

Comparing Secrets Management Tools

Tool Best For Key Strengths Limitations Cost Model
HashiCorp Vault Multi-cloud, on-prem, enterprises Dynamic secrets, flexibility, open-source Complex setup, operational overhead Free OSS / Enterprise paid
AWS Secrets Manager AWS-native workloads Managed, auto-rotation, Lambda integration AWS-only, per-secret pricing $0.40/secret/month + API calls
AWS KMS Encryption key management HSM-backed, FIPS compliant, AWS integration Keys only, no secret storage $1/key/month + API calls
Azure Key Vault Azure workloads Managed identity, certificates, HSM Azure-specific Tiered pricing by operations
Google Secret Manager GCP workloads IAM integration, replication GCP-specific $0.03/10K operations
SOPS GitOps, developers Git-native, multiple backends, free No dynamic secrets, manual rotation Free (uses cloud KMS)
Doppler Developer teams, startups Easy setup, sync to platforms Less enterprise features Free tier / paid plans

Common Mistakes to Avoid

1. Not Rotating Secrets Regularly

# WRONG - Static credentials that never change
DB_PASSWORD = "same_password_for_5_years"

# CORRECT - Dynamic credentials with automatic rotation
class DatabaseConnection:
    def __init__(self, vault_client: VaultClient):
        self.vault = vault_client
        self._connection = None
        self._lease_id = None
    
    def get_connection(self):
        if self._needs_refresh():
            creds = self.vault.get_database_credentials('app-role')
            self._connection = self._create_connection(creds)
            self._lease_id = creds['lease_id']
        return self._connection
    
    def _needs_refresh(self) -> bool:
        # Check if credentials are expiring soon
        return self._connection is None or self._is_lease_expiring()

2. Overly Broad Access Policies

# WRONG - Allows access to all secrets
path "secret/*" {
  capabilities = ["create", "read", "update", "delete", "list"]
}

# CORRECT - Scoped to specific application paths
path "secret/data/myapp/*" {
  capabilities = ["read"]
}

path "secret/data/myapp/{{identity.entity.name}}/*" {
  capabilities = ["create", "read", "update", "delete"]
}

3. Logging Secrets Accidentally

# WRONG - Secrets in logs
logger.info(f"Connecting to database with password: {db_password}")
print(f"API Key: {api_key}")

# CORRECT - Mask sensitive values
import re

class SecretFilter(logging.Filter):
    """Filter to mask secrets in log output."""
    
    PATTERNS = [
        (r'password["\']?\s*[:=]\s*["\']?([^"\',\s]+)', 'password=***'),
        (r'api[_-]?key["\']?\s*[:=]\s*["\']?([^"\',\s]+)', 'api_key=***'),
        (r'token["\']?\s*[:=]\s*["\']?([^"\',\s]+)', 'token=***'),
    ]
    
    def filter(self, record):
        message = record.getMessage()
        for pattern, replacement in self.PATTERNS:
            message = re.sub(pattern, replacement, message, flags=re.IGNORECASE)
        record.msg = message
        record.args = ()
        return True

logger.addFilter(SecretFilter())
logger.info(f"Connected to database")  # Log connection without credentials

4. Hardcoded Vault Tokens

# WRONG - Hardcoded token
vault_client = hvac.Client(url='http://vault:8200')
vault_client.token = 's.ABC123hardcodedtoken'

# CORRECT - Use AppRole or Kubernetes auth
vault_client = VaultClient(
    role_id=os.getenv('VAULT_ROLE_ID'),
    secret_id=os.getenv('VAULT_SECRET_ID')
)

# Or with Kubernetes authentication
vault_client = VaultClient()  # Auto-detects K8s environment

Best Practices for Secrets Management

  • Implement least-privilege access: Grant minimum permissions needed for each service.
  • Use dynamic secrets when possible: Short-lived credentials limit blast radius of compromises.
  • Enable comprehensive audit logging: Track all secret access for security and compliance.
  • Automate rotation: Never rely on manual credential rotation processes.
  • Encrypt secrets at rest and in transit: Use TLS for all vault communications.
  • Separate secrets by environment: Production secrets should never be accessible from dev.
  • Use managed identities: Prefer cloud provider identity federation over static credentials.
  • Monitor for secret exposure: Use tools like git-secrets and truffleHog in CI/CD.

Final Thoughts

Choosing the right secrets management solution depends on your infrastructure complexity, cloud strategy, and security maturity. HashiCorp Vault offers unmatched flexibility for multi-cloud and hybrid environments with advanced features like dynamic secrets. AWS Secrets Manager and KMS provide seamless integration for AWS-native workloads with minimal operational overhead. Azure Key Vault and Google Secret Manager serve similar roles in their respective clouds. For GitOps workflows, SOPS enables encrypted secrets in version control.

The key is choosing a solution that fits your environment while following security best practices: rotate credentials automatically, enforce least-privilege access, audit all access, and never store secrets in code repositories. For more on securing your applications, read Advanced API Security: Scopes, Claims, and Token Revocation and Securing Spring Boot Apps with OAuth2 and Keycloak. For official documentation, explore the HashiCorp Vault documentation and AWS Secrets Manager documentation.

Leave a Comment