楼主: yunnandlg
28475 25

[学科前沿] The Conservative Formula: Quantitative Investing Made Easy [推广有奖]

21
宽客老丁(未真实交易用户) 发表于 昨天 09:22
云卷云舒 不猜阴晴
一念放空 毋意自明
风过千山 不执枯荣
万事留余 毋必轻盈

22
宽客老丁(未真实交易用户) 发表于 昨天 09:24
云卷云舒 不猜阴晴
一念放空 毋意自明
风过千山 不执枯荣
万事留余 毋必轻盈

23
宽客老丁(未真实交易用户) 发表于 昨天 09:26
云卷云舒 不猜阴晴
一念放空 毋意自明
风过千山 不执枯荣
万事留余 毋必轻盈

24
宽客老丁(未真实交易用户) 发表于 昨天 09:29
云卷云舒 不猜阴晴
一念放空 毋意自明
风过千山 不执枯荣
万事留余 毋必轻盈

25
宽客老丁(未真实交易用户) 发表于 昨天 09:32
云卷云舒 不猜阴晴
一念放空 毋意自明
风过千山 不执枯荣
万事留余 毋必轻盈

26
宽客老丁(未真实交易用户) 发表于 昨天 11:18
import requests
import json
import time
from typing import Optional, Dict, List, Any, Union

def get_unix_time() -> int:
    """获取当前Unix时间戳"""
    return int(time.time())

def chang_xq_stocks(stock: str) -> str:
    """
    转换股票代码为雪球格式
    Args:
        stock: 股票代码,支持多种格式
    Returns:
        雪球格式的股票代码 (如: SH000001, SZ000001)
    """
    stock = stock.upper()
   
    # 如果已经是SH/SZ开头,直接返回
    if stock.startswith(('SH', 'SZ')):
        return stock
   
    # 处理聚宽代码
    if stock.endswith('XSHG'):
        return 'SH' + stock[:6]
    elif stock.endswith('XSHE'):
        return 'SZ' + stock[:6]
   
    # 处理纯数字代码
    if stock.isdigit():
        # 上海股票代码以5或6开头,深圳以0、1、2、3开头
        if stock[0] in ('5', '6', '9'):
            return 'SH' + stock
        else:
            return 'SZ' + stock
   
    return stock

class XqStocks:
    """雪球自选股操作类"""
   
    DEFAULT_HEADERS = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
        'Host': 'xueqiu.com',
        'Pragma': 'no-cache',
        'Connection': 'keep-alive',
        'Accept': '*/*',
        'Accept-Encoding': 'gzip, deflate, br',
        'Cache-Control': 'no-cache',
        'Referer': 'https://xueqiu.com/portfolios',
        'X-Requested-With': 'XMLHttpRequest',
        'Accept-Language': 'zh-CN,zh;q=0.9',
        'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'
    }
   
    CONFIG = {
        'login_api': 'https://xueqiu.com/user/login',
        'stocks': 'https://xueqiu.com/v4/stock/portfolio/stocks.json?size=1000&pid=%d&category=%s&type=%s',
        'poster': 'https://xueqiu.com/service/poster',
        'group_list': 'https://xueqiu.com/v4/stock/portfolio/list.json?system=true'
    }
   
    def __init__(self, **kwargs):
        """
        初始化
        Args:
            account: 登录账号(手机号)
            password: 密码
            group_name: 默认分组名
            username: 用户名(可选)
            proxy_ip: 代理IP(可选)
            proxy_port: 代理端口(可选,默认808)
            proxy_user: 代理用户名(可选)
            proxy_pwd: 代理密码(可选)
        """
        self.session = requests.Session()
        self.session.headers.update(self.DEFAULT_HEADERS)
        
        # 设置代理
        self._setup_proxy(kwargs)
        
        self.account_config = kwargs
        self.group_infos: Dict[str, Dict[str, Any]] = {}
        self.defalut_group_name = kwargs.get('group_name', '')
        self.defalut_group_info = {
            'category': 2,
            'type': 1,
            'id': -1,
            'name': ''
        }
        self.is_logon = False
   
    def _setup_proxy(self, config: Dict[str, Any]) -> None:
        """设置代理"""
        if 'proxy_ip' not in config:
            return
            
        proxy_ip = config['proxy_ip']
        proxy_port = config.get('proxy_port', 808)
        
        # 构建代理URL
        if 'proxy_user' in config and 'proxy_pwd' in config:
            proxy_auth = f"{config['proxy_user']}:{config['proxy_pwd']}@"
            proxy_url = f"http://{proxy_auth}{proxy_ip}:{proxy_port}"
        else:
            proxy_url = f"http://{proxy_ip}:{proxy_port}"
        
        self.session.proxies = {
            "http": proxy_url,
            "https": proxy_url,
        }
   
    def login(self) -> bool:
        """登录雪球账号"""
        login_data = {
            'username': self.account_config.get('username', ''),
            'areacode': '86',
            'telephone': self.account_config['account'],
            'remember_me': '0',
            'password': self.account_config['password']
        }
        
        try:
            response = self.session.post(self.CONFIG['login_api'], data=login_data, timeout=10)
            response.raise_for_status()
            login_status = response.json()
            
            if 'error_description' in login_status:
                return False
            
            self.update_group_infos()
            self.is_logon = True
            return True
            
        except requests.exceptions.RequestException as e:
            print(f"登录失败: {e}")
            return False
   
    def get_group_params(self, group_name: Optional[str] = None) -> Dict[str, Any]:
        """获取分组参数"""
        if group_name is None:
            group_name = self.defalut_group_name
        
        # 如果分组名非空且不在已知分组中,则创建分组
        if group_name and group_name not in self.group_infos:
            if self.create_group(group_name):
                self.update_group_infos()
        
        return self.group_infos.get(group_name, self.defalut_group_info)
   
    def get_stocks(self, group_name: Optional[str] = None) -> List[str]:
        """获取指定分组下的自选股列表"""
        group_params = self.get_group_params(group_name)
        
        try:
            url = self.CONFIG['stocks'] % (
                group_params.get('id', -1),
                group_params.get('category', 1),
                group_params.get('type', 1)
            )
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            
            data = response.json()
            stocks = data.get('stocks', [])
            
            return [
                x['code'] for x in stocks
                if x.get('exchange') in ('SH', 'SZ')
            ]
            
        except requests.exceptions.RequestException as e:
            print(f"获取股票列表失败: {e}")
            return []
   
    def update_group_infos(self) -> None:
        """更新分组信息"""
        try:
            response = self.session.get(self.CONFIG['group_list'], timeout=10)
            response.raise_for_status()
            
            data = response.json()
            portfolios = data.get('portfolios', [])
            
            self.group_infos = {}
            for portfolio in portfolios:
                name = portfolio['name']
                self.group_infos[name] = {
                    'category': portfolio['portfolio']['category'],
                    'type': portfolio['type'],
                    'id': portfolio['id'],
                    'name': name
                }
               
        except requests.exceptions.RequestException as e:
            print(f"更新分组信息失败: {e}")
   
    def create_group(self, group_name: str) -> bool:
        """创建分组"""
        post_data = {
            'data[_]': get_unix_time(),
            'data[pname]': group_name,
            'url': '/stock/portfolio/create.json'
        }
        
        try:
            response = self.session.post(self.CONFIG['poster'], data=post_data, timeout=10)
            response.raise_for_status()
            result = response.json()
            return result.get('success', False)
            
        except requests.exceptions.RequestException as e:
            print(f"创建分组失败: {e}")
            return False
   
    def _modify_stock(self, stock: str, action: str, group_name: Optional[str] = None) -> bool:
        """修改股票(添加/删除)的通用方法"""
        stock = chang_xq_stocks(stock)
        post_data = {
            'data[_]': get_unix_time(),
            'data[code]': stock,
            'url': f'/stock/portfolio/{action}.json'
        }
        
        # 添加股票需要额外参数
        if action == 'addstock':
            post_data['data[isnotice]'] = 1
            post_data['data[targetpercent]'] = 7
        
        group_params = self.get_group_params(group_name)
        if group_params.get('name'):
            post_data_key = 'data[pnames]' if action == 'addstock' else 'data[pname]'
            post_data[post_data_key] = group_params['name']
            post_data['data[pids]'] = group_params['type']
        
        try:
            response = self.session.post(self.CONFIG['poster'], data=post_data, timeout=10)
            response.raise_for_status()
            result = response.json()
            return result.get('success', False)
            
        except requests.exceptions.RequestException as e:
            print(f"{'添加' if action == 'addstock' else '删除'}股票失败: {e}")
            return False
   
    def add_stock(self, stock: str, group_name: Optional[str] = None) -> bool:
        """向分组添加股票"""
        return self._modify_stock(stock, 'addstock', group_name)
   
    def del_stock(self, stock: str, group_name: Optional[str] = None) -> bool:
        """从分组删除股票"""
        return self._modify_stock(stock, 'delstock', group_name)
   
    def sync(self, stocks: List[str], group_name: Optional[str] = None) -> str:
        """
        同步自选股列表到雪球
        Args:
            stocks: 股票代码列表
            group_name: 分组名
        Returns:
            成功返回空字符串,失败返回错误信息
        """
        try:
            if not self.is_logon:
                if not self.login():
                    return '登录雪球失败'
            
            # 转换股票代码格式
            target_stocks = [chang_xq_stocks(stock) for stock in stocks]
            
            # 获取当前自选股
            current_stocks = self.get_stocks(group_name)
            
            # 删除不再需要的股票
            for stock in (x for x in current_stocks if x not in target_stocks):
                self.del_stock(stock, group_name)
            
            # 添加新股票
            for stock in (x for x in target_stocks if x not in current_stocks):
                self.add_stock(stock, group_name)
            
            return ''
            
        except Exception as e:
            return f'同步雪球失败: {str(e)}'

# 使用示例
if __name__ == "__main__":
    # 创建对象
    xq = XqStocks(
        username='',            # 用户名(可选)
        account='151022',       # 登录账号(手机号)
        password='y488',        # 密码
        group_name='测试',       # 默认分组名
        # proxy_ip='127.0.0.1',  # 代理IP(可选)
        # proxy_port=8080,       # 代理端口(可选)
        # proxy_user='user',     # 代理用户名(可选)
        # proxy_pwd='pwd'        # 代理密码(可选)
    )
   
    # 测试登录
    if not xq.login():
        print('登录失败')
    else:
        print('登录成功')
        
        # 获取自选股
        stocks = xq.get_stocks()
        print('当前自选股:', stocks)
        
        # 测试添加股票
        print('添加股票结果:', xq.add_stock('SH000016'))
        
        # 测试删除股票
        print('删除股票结果:', xq.del_stock('SH600392'))
        
        # 再次获取自选股
        print('操作后自选股:', xq.get_stocks())
        
        # 同步测试
        target_stocks = ['000001.XSHE', '399678.XSHE', '600392.XSHG']
        result = xq.sync(target_stocks)
        if result:
            print('同步失败:', result)
        else:
            print('同步成功')
            print('同步后自选股:', xq.get_stocks())
        
        # 获取所有分组
        print('\n所有分组列表:')
        for group_name in xq.group_infos:
            stocks = xq.get_stocks(group_name)
            print(f'{group_name}: {stocks}')

您需要登录后才可以回帖 登录 | 我要注册

本版微信群
加好友,备注cda
拉您进交流群
GMT+8, 2026-1-21 07:29