WordPress 启用 RCP 并用 python 获取权限验证

Okay, let's implement the architecture where Python calls a custom WordPress REST API endpoint to get permissions and caches them in Redis.

This requires code in two places:

  1. WordPress: A custom REST API endpoint.
  2. Python: Code to call the endpoint and manage the Redis cache.

Part 1: WordPress Custom REST API Endpoint

Add this code to your active theme's functions.php file or, preferably, create a custom plugin for it.

PHP

<?php

/**
 * Register custom REST API endpoint to get user permissions.
 * Endpoint: /my-plugin/v1/user-permissions/<user_id>
 */
add_action( 'rest_api_init', 'register_user_permissions_endpoint' );

function register_user_permissions_endpoint() {
    register_rest_route( 'my-plugin/v1', '/user-permissions/(?P<id>\d+)', [
        'methods'             => WP_REST_Server::READABLE, // GET request
        'callback'            => 'get_user_permissions_callback',
        'args'                => [
            'id' => [
                'validate_callback' => function( $param, $request, $key ) {
                    return is_numeric( $param );
                },
                'required'          => true,
                'description'       => 'WordPress User ID',
            ],
        ],
        // IMPORTANT: Add permission checks here!
        'permission_callback' => 'check_permissions_api_key', // Custom function for security
    ] );
}

/**
 * Permission callback: Check for a valid API Key in the request header.
 *
 * IMPORTANT: Replace 'YOUR_SUPER_SECRET_API_KEY' with a strong, unique key.
 * Store this key securely and provide it to your Python application.
 */
function check_permissions_api_key( $request ) {
    $provided_key = $request->get_header( 'X-API-Key' );
    $expected_key = defined('MY_PLUGIN_USER_PERMISSIONS_API_KEY') ? MY_PLUGIN_USER_PERMISSIONS_API_KEY : 'YOUR_SUPER_SECRET_API_KEY'; // Define in wp-config.php for better security

    // Use hash_equals for timing attack safe comparison
    if ( ! empty( $provided_key ) && hash_equals( $expected_key, $provided_key ) ) {
        return true; // Allow access if keys match
    }

    // If keys don't match, return an error
    return new WP_Error( 'rest_forbidden', esc_html__( 'Invalid or missing API Key.', 'my-plugin' ), [ 'status' => 401 ] );
}

/**
 * Callback function for the REST API endpoint.
 * Fetches WP Role and RCP data for the given user ID.
 */
function get_user_permissions_callback( WP_REST_Request $request ) {
    $user_id = (int) $request['id'];
    $user = get_userdata( $user_id );

    if ( ! $user ) {
        return new WP_Error( 'rest_user_not_found', esc_html__( 'User not found.', 'my-plugin' ), [ 'status' => 404 ] );
    }

    // --- WordPress Role ---
    $wp_roles = ! empty( $user->roles ) ? $user->roles : [];
    $user_wp_role = ! empty( $wp_roles ) ? $wp_roles[0] : null;

    // --- RCP Data ---
    $rcp_data = [
        'membership_level_name' => null,
        'access_level'          => null,
        'expiration_date'       => null, // Use null for non-expiring or no membership
        'status'                => 'none', // e.g., 'none', 'active', 'expired', 'pending', 'cancelled'
    ];

    if ( function_exists( 'rcp_get_customer_by_user_id' ) && function_exists( 'rcp_get_customer_memberships' ) && function_exists( 'rcp_get_membership_level' ) ) {
        $customer = rcp_get_customer_by_user_id( $user_id );

        if ( $customer ) {
            // Get active memberships first. If none, maybe check for expired/pending if needed.
            // Prioritizing 'active' state for permissions is common.
            $memberships = rcp_get_customer_memberships( $customer->id, 'active' );

            // If no active membership, you might want to fetch the *most recent* non-active one
            // to potentially get status like 'expired'. This depends on your needs.
            // Example: Fetch most recent overall if no active ones found:
            if ( empty( $memberships ) ) {
                 $all_memberships = rcp_get_customer_memberships( $customer->id ); // Get all
                 if (!empty($all_memberships)) {
                     $memberships = [ $all_memberships[0] ]; // Take the most recent one regardless of status
                 }
            }

            if ( ! empty( $memberships ) ) {
                $membership = $memberships[0]; // Taking the first/most relevant one
                $level_id = $membership->get_object_id();
                $level = rcp_get_membership_level( $level_id );

                if ( $level ) {
                    $rcp_data['membership_level_name'] = $level->get_name();
                    $rcp_data['access_level'] = $level->get_access_level();
                }

                $rcp_data['status'] = $membership->get_status(); // Get status like 'active', 'expired', etc.

                $expiration_timestamp = $membership->get_expiration_timestamp();
                if ( $expiration_timestamp ) {
                    $rcp_data['expiration_date'] = gmdate( 'c', $expiration_timestamp ); // ISO 8601 format (UTC)
                } elseif ( $rcp_data['status'] === 'active' ) {
                     // If membership is active but has no expiration timestamp, assume lifetime
                    $rcp_data['expiration_date'] = 'lifetime';
                }
            }
        }
    } else {
         // RCP plugin might be inactive
         return new WP_Error( 'rcp_not_active', esc_html__( 'Restrict Content Pro is not active or functions are unavailable.', 'my-plugin' ), [ 'status' => 501 ] );
    }

    // --- Prepare Response ---
    $response_data = [
        'user_id'   => $user_id,
        'wp_role'   => $user_wp_role,
        'rcp'       => $rcp_data,
    ];

    return new WP_REST_Response( $response_data, 200 );
}

?>

WordPress Code Explanation & Security:

  1. register_rest_route: Creates the endpoint /wp-json/my-plugin/v1/user-permissions/{USER_ID}.
  2. permission_callback: This is CRITICAL for security. The example uses check_permissions_api_key.
    • check_permissions_api_key: Looks for a custom HTTP header X-API-Key.
    • SECRET KEY: Replace 'YOUR_SUPER_SECRET_API_KEY' with a strong, unique key. Best Practice: Define this key in your wp-config.php like define('MY_PLUGIN_USER_PERMISSIONS_API_KEY', 'your_real_secret_key'); and use that constant in the code. This keeps the key out of your theme/plugin files.
    • The Python app must send this exact key in the X-API-Key header when calling the endpoint.
    • hash_equals is used for secure comparison to prevent timing attacks.
  3. get_user_permissions_callback:
    • Gets the User ID from the URL.
    • Fetches the WP_User object.
    • Gets the WordPress role.
    • Checks if RCP functions exist.
    • Gets the RCP Customer and their memberships (prioritizing active, but fetching most recent if none are active to get status like 'expired').
    • Extracts RCP level name, access level, expiration date (using ISO 8601 format or 'lifetime'), and status.
    • Returns all data in a JSON response.
  4. Error Handling: Includes basic checks for user not found, RCP not active, and invalid API Key.

Part 2: Python Application Code Example

This example uses the requests library to call the API and redis-py to interact with Redis.

Python

import requests
import redis
import json
import os
from datetime import timedelta

# --- Configuration ---
# Best practice: Load these from environment variables or a config file
WP_API_URL = os.getenv("WP_API_URL", "https://yourdomain.com/wp-json/my-plugin/v1/user-permissions/")
WP_API_KEY = os.getenv("WP_API_KEY", "YOUR_SUPER_SECRET_API_KEY") # Use the SAME key as in WordPress
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
REDIS_PORT = int(os.getenv("REDIS_PORT", 6379))
REDIS_DB = int(os.getenv("REDIS_DB", 0))
CACHE_TTL_SECONDS = int(os.getenv("CACHE_TTL_SECONDS", 300)) # Cache for 5 minutes by default

# Connect to Redis
try:
    redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, decode_responses=True)
    redis_client.ping() # Verify connection
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Error connecting to Redis: {e}")
    # Handle connection error appropriately - maybe fallback or raise exception
    redis_client = None # Set to None to indicate failure

def get_user_permissions(user_id: int) -> dict | None:
    """
    Fetches user permissions, checking Redis cache first, then calling WP API.

    Args:
        user_id: The WordPress user ID.

    Returns:
        A dictionary containing permission data, or None if an error occurs.
    """
    if not redis_client:
        print("Redis client not available.")
        # Optionally, try calling the API directly without caching
        # return fetch_from_wp_api(user_id)
        return None

    cache_key = f"user_perms:{user_id}"

    try:
        # 1. Check Cache
        cached_data = redis_client.get(cache_key)
        if cached_data:
            print(f"Cache HIT for user {user_id}")
            return json.loads(cached_data) # Data is stored as JSON string in Redis

        # 2. Cache MISS - Fetch from WordPress API
        print(f"Cache MISS for user {user_id}. Fetching from WP API...")
        permissions_data = fetch_from_wp_api(user_id)

        if permissions_data:
            # 3. Store in Cache
            try:
                redis_client.setex(
                    cache_key,
                    timedelta(seconds=CACHE_TTL_SECONDS),
                    json.dumps(permissions_data) # Store as JSON string
                )
                print(f"Stored permissions for user {user_id} in cache (TTL: {CACHE_TTL_SECONDS}s).")
            except redis.exceptions.RedisError as e:
                 print(f"Error writing to Redis cache for user {user_id}: {e}")
                 # Proceed with the data fetched, even if caching failed

            return permissions_data
        else:
            # API fetch failed or returned no data
            return None

    except redis.exceptions.RedisError as e:
        print(f"Redis error for user {user_id}: {e}")
        # Fallback: Try fetching directly from API if Redis fails?
        # return fetch_from_wp_api(user_id)
        return None
    except Exception as e:
        print(f"An unexpected error occurred for user {user_id}: {e}")
        return None

def fetch_from_wp_api(user_id: int) -> dict | None:
    """Helper function to call the WordPress REST API."""
    api_endpoint = f"{WP_API_URL}{user_id}"
    headers = {
        'X-API-Key': WP_API_KEY,
        'Accept': 'application/json'
    }

    try:
        response = requests.get(api_endpoint, headers=headers, timeout=10) # Add a timeout
        response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)

        print(f"Successfully fetched data from WP API for user {user_id}")
        return response.json()

    except requests.exceptions.RequestException as e:
        print(f"Error calling WP API for user {user_id}: {e}")
        # Handle different error types (connection, timeout, HTTP errors) if needed
        return None
    except json.JSONDecodeError as e:
         print(f"Error decoding JSON response from WP API for user {user_id}: {e}")
         return None

# --- Example Usage ---
if __name__ == "__main__":
    test_user_id = 1 # Replace with a valid user ID for testing

    permissions = get_user_permissions(test_user_id)

    if permissions:
        print("\n--- User Permissions ---")
        print(f"User ID: {permissions.get('user_id')}")
        print(f"WordPress Role: {permissions.get('wp_role')}")

        rcp_info = permissions.get('rcp', {})
        print("RCP Info:")
        print(f"  Level Name: {rcp_info.get('membership_level_name')}")
        print(f"  Access Level: {rcp_info.get('access_level')}")
        print(f"  Expiration: {rcp_info.get('expiration_date')}")
        print(f"  Status: {rcp_info.get('status')}")
        print("------------------------")

        # Example authorization check:
        if rcp_info.get('status') == 'active' and rcp_info.get('access_level', -1) >= 5:
             print("User has active membership with access level 5 or higher. Granting access.")
        else:
             print("User does not meet access requirements.")

    else:
        print(f"\nCould not retrieve permissions for user {test_user_id}.")

    # Try again to see cache hit
    print("\nRequesting permissions again...")
    permissions_again = get_user_permissions(test_user_id)
    if permissions_again:
        print("Successfully retrieved permissions (likely from cache).")
    else:
         print("Failed to retrieve permissions on second attempt.")

Python Code Explanation:

  1. Configuration: Sets up API URL, the shared API Key, Redis connection details, and cache TTL. Uses environment variables as a best practice.
  2. Redis Connection: Connects to Redis using redis-py. Includes basic error handling for the connection. decode_responses=True makes Redis return strings instead of bytes.
  3. get_user_permissions(user_id):
    • Checks if Redis client is available.
    • Constructs the cache key.
    • Tries to get data from Redis (redis_client.get).
    • Cache Hit: If data exists, it's loaded from JSON and returned.
    • Cache Miss: Calls Workspace_from_wp_api.
    • If API call is successful, stores the result (as a JSON string) in Redis using redis_client.setex (which sets value and expiration time).
    • Returns the fetched data.
    • Includes error handling for Redis operations.
  4. Workspace_from_wp_api(user_id):
    • Constructs the full API endpoint URL.
    • Sets the required X-API-Key header.
    • Uses requests.get to call the endpoint. Includes a timeout.
    • response.raise_for_status() checks for HTTP errors (like 401 Unauthorized if the API key is wrong, 404 Not Found, 5xx Server Error).
    • Parses the JSON response.
    • Includes error handling for network issues and JSON decoding errors.
  5. Example Usage (if __name__ == "__main__":)
    • Demonstrates how to call get_user_permissions.
    • Prints the retrieved data.
    • Shows a basic example of checking permissions based on the fetched data.
    • Calls the function again to show the cache hit message.

Before Running:

  1. WordPress:
    • Add the PHP code to your site.
    • Generate a strong secret API key and replace the placeholder in the PHP code (or better, define it in wp-config.php).
  2. Python:
    • Install required libraries: pip install requests redis
    • Configure the Python script with your actual WordPress API URL, the same secret API Key you set in WordPress, and your Redis connection details. Using environment variables is recommended for sensitive data like the API key.
  3. Redis: Ensure your Redis server is running and accessible from where you run the Python script.

This setup provides a robust way for your Python application to get up-to-date (within the cache TTL) user permissions from WordPress without embedding potentially stale data in the JWT.

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇