Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Value investing analysis tool for Chinese A-share stocks with screening, financial analysis, industry comparison, and DCF valuation.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/valuation_calculator.py
1#!/usr/bin/env python32"""3A股估值计算器4提供DCF、DDM、相对估值等多种估值方法56依赖: pip install pandas numpy7"""89import argparse10import json11import sys12from datetime import datetime13from typing import Optional, Dict1415try:16import pandas as pd17import numpy as np18except ImportError:19print("错误: 请先安装依赖库")20print("pip install pandas numpy")21sys.exit(1)222324class ValuationCalculator:25"""估值计算器"""2627def __init__(self, stock_data: Dict = None):28self.stock_data = stock_data or {}29self.results = {}3031def load_data(self, file_path: str):32"""从JSON文件加载股票数据"""33with open(file_path, 'r', encoding='utf-8') as f:34self.stock_data = json.load(f)3536def dcf_valuation(self, discount_rate: float = 10,37forecast_years: int = 5,38terminal_growth: float = 3) -> Dict:39"""40DCF现金流折现估值4142参数:43discount_rate: 折现率 (%)44forecast_years: 预测期年数45terminal_growth: 永续增长率 (%)46"""47result = {48"method": "DCF现金流折现",49"parameters": {50"discount_rate": discount_rate,51"forecast_years": forecast_years,52"terminal_growth": terminal_growth53},54"calculation": {},55"intrinsic_value": None,56"per_share_value": None57}5859# 获取现金流数据60cash_flow = self.stock_data.get('financial_data', {}).get('cash_flow', [])61basic_info = self.stock_data.get('basic_info', {})6263if not cash_flow:64result["error"] = "无法获取现金流数据"65return result6667try:68# 计算历史自由现金流69fcf_history = []70for cf in cash_flow[:4]: # 最近4个季度71ocf = self._safe_float(cf.get('经营活动产生的现金流量净额', 0))72capex = abs(self._safe_float(cf.get('购建固定资产、无形资产和其他长期资产支付的现金', 0)))73if ocf is not None:74fcf = ocf - capex75fcf_history.append(fcf)7677if not fcf_history:78result["error"] = "无法计算自由现金流"79return result8081# 年化FCF (简单年化)82annual_fcf = sum(fcf_history)83result["calculation"]["当前年化FCF"] = annual_fcf8485# 估算增长率 (使用历史净利润增长率)86indicators = self.stock_data.get('financial_indicators', [])87growth_rate = 10 # 默认10%88if indicators:89hist_growth = self._safe_float(indicators[0].get('净利润增长率'))90if hist_growth and -50 < hist_growth < 100:91growth_rate = min(max(hist_growth, 0), 30) # 限制在0-30%之间9293result["calculation"]["预计增长率"] = growth_rate9495# 预测未来现金流96r = discount_rate / 10097g = growth_rate / 10098tg = terminal_growth / 10099100pv_fcf = 0101future_fcf = []102for year in range(1, forecast_years + 1):103fcf = annual_fcf * ((1 + g) ** year)104pv = fcf / ((1 + r) ** year)105pv_fcf += pv106future_fcf.append({"year": year, "fcf": fcf, "pv": pv})107108result["calculation"]["预测期现金流现值"] = pv_fcf109result["calculation"]["future_fcf"] = future_fcf110111# 终值计算 (Gordon模型)112terminal_fcf = annual_fcf * ((1 + g) ** forecast_years) * (1 + tg)113terminal_value = terminal_fcf / (r - tg)114pv_terminal = terminal_value / ((1 + r) ** forecast_years)115116result["calculation"]["终值"] = terminal_value117result["calculation"]["终值现值"] = pv_terminal118119# 总价值120total_value = pv_fcf + pv_terminal121result["intrinsic_value"] = total_value122123# 每股价值124total_shares = self._parse_shares(basic_info.get('total_shares', ''))125if total_shares:126per_share = total_value / total_shares127result["per_share_value"] = per_share128129except Exception as e:130result["error"] = str(e)131132return result133134def ddm_valuation(self, required_return: float = 10,135dividend_growth: float = None) -> Dict:136"""137DDM股息折现估值 (Gordon模型)138139参数:140required_return: 要求回报率 (%)141dividend_growth: 股息增长率 (%),如果为None则自动计算142"""143result = {144"method": "DDM股息折现",145"parameters": {146"required_return": required_return147},148"calculation": {},149"intrinsic_value": None,150"per_share_value": None151}152153# 获取分红数据154dividend = self.stock_data.get('dividend', {})155dividend_history = dividend.get('dividend_history', [])156basic_info = self.stock_data.get('basic_info', {})157158if not dividend_history:159result["error"] = "无分红历史数据"160result["note"] = "该公司可能分红较少或不稳定,不适合DDM估值"161return result162163try:164# 计算最近股息165recent_dividends = []166for d in dividend_history[:5]: # 最近5年167div = self._safe_float(d.get('每股股利', d.get('派息', 0)))168if div and div > 0:169recent_dividends.append(div)170171if len(recent_dividends) < 2:172result["error"] = "分红数据不足,无法使用DDM"173return result174175current_dividend = recent_dividends[0]176result["calculation"]["当前每股股息"] = current_dividend177178# 计算股息增长率179if dividend_growth is None:180if len(recent_dividends) >= 2:181# 计算复合增长率182years = len(recent_dividends) - 1183cagr = (recent_dividends[0] / recent_dividends[-1]) ** (1 / years) - 1184dividend_growth = cagr * 100185else:186dividend_growth = 3 # 默认3%187188# 限制增长率189dividend_growth = min(max(dividend_growth, 0), required_return - 1)190result["parameters"]["dividend_growth"] = dividend_growth191result["calculation"]["股息增长率"] = dividend_growth192193# Gordon模型: P = D1 / (r - g)194r = required_return / 100195g = dividend_growth / 100196d1 = current_dividend * (1 + g)197198if r <= g:199result["error"] = "增长率不能大于或等于要求回报率"200return result201202per_share_value = d1 / (r - g)203result["per_share_value"] = per_share_value204result["calculation"]["下期预期股息(D1)"] = d1205206# 总价值207total_shares = self._parse_shares(basic_info.get('total_shares', ''))208if total_shares:209result["intrinsic_value"] = per_share_value * total_shares210211except Exception as e:212result["error"] = str(e)213214return result215216def _assess_percentile(self, percentile: float) -> str:217"""根据分位数评估估值水平"""218if percentile < 20:219return "处于历史低位,可能被低估"220elif percentile < 40:221return "处于历史较低水平"222elif percentile < 60:223return "处于历史中等水平"224elif percentile < 80:225return "处于历史较高水平"226return "处于历史高位,估值偏贵"227228def relative_valuation(self) -> Dict:229"""相对估值法,与行业均值和历史均值对比"""230result = {231"method": "相对估值",232"current_valuation": {},233"comparison": {},234"assessment": {}235}236237valuation = self.stock_data.get('valuation', {})238price = self.stock_data.get('price', {})239basic_info = self.stock_data.get('basic_info', {})240241try:242current_pe = self._safe_float(basic_info.get('pe_ttm'))243current_pb = self._safe_float(basic_info.get('pb'))244245if not current_pe and valuation.get('latest'):246current_pe = self._safe_float(valuation['latest'].get('pe'))247current_pb = self._safe_float(valuation['latest'].get('pb'))248249result["current_valuation"] = {250"PE_TTM": current_pe,251"PB": current_pb,252"当前价格": price.get('latest_price')253}254255pe_percentile = valuation.get('pe_percentile')256pb_percentile = valuation.get('pb_percentile')257258result["comparison"]["PE历史分位数"] = pe_percentile259result["comparison"]["PB历史分位数"] = pb_percentile260261# 估值评估262if pe_percentile is not None:263result["assessment"]["PE评估"] = self._assess_percentile(pe_percentile)264if pb_percentile is not None:265result["assessment"]["PB评估"] = self._assess_percentile(pb_percentile)266267# 计算基于历史均值的合理价格268current_price = price.get('latest_price', 0)269if current_pe and pe_percentile and pe_percentile > 0 and current_price and current_pe > 0:270fair_pe = current_pe * (50 / pe_percentile)271fair_price_pe = current_price * (fair_pe / current_pe)272result["comparison"]["基于PE的合理价格"] = round(fair_price_pe, 2)273274except Exception as e:275result["error"] = str(e)276277return result278279def calculate_margin_of_safety(self, intrinsic_value: float,280current_price: float,281margin_pct: float = 30) -> Dict:282"""计算安全边际"""283result = {284"intrinsic_value": intrinsic_value,285"current_price": current_price,286"margin_of_safety_required": margin_pct287}288289if not (intrinsic_value and current_price):290return result291292actual_margin = (intrinsic_value - current_price) / intrinsic_value * 100293safety_price = intrinsic_value * (1 - margin_pct / 100)294295result["actual_margin_of_safety"] = round(actual_margin, 2)296result["safety_price"] = round(safety_price, 2)297298if current_price < safety_price:299result["conclusion"] = "低估 - 当前价格低于安全边际价格,具有投资价值"300elif current_price < intrinsic_value:301result["conclusion"] = "合理偏低 - 当前价格低于内在价值,但未达到安全边际"302else:303result["conclusion"] = "高估 - 当前价格高于内在价值"304305return result306307def comprehensive_valuation(self, discount_rate: float = 10,308terminal_growth: float = 3,309margin_of_safety: float = 30) -> Dict:310"""综合估值,结合多种方法得出估值结论"""311result = {312"code": self.stock_data.get('code', ''),313"name": self.stock_data.get('basic_info', {}).get('name', ''),314"valuation_date": datetime.now().isoformat(),315"methods": {},316"summary": {}317}318319# 获取当前价格320current_price = self.stock_data.get('price', {}).get('latest_price')321322# DCF估值323dcf = self.dcf_valuation(discount_rate=discount_rate,324terminal_growth=terminal_growth)325result["methods"]["DCF"] = dcf326327# DDM估值328ddm = self.ddm_valuation()329result["methods"]["DDM"] = ddm330331# 相对估值332relative = self.relative_valuation()333result["methods"]["相对估值"] = relative334335# 汇总估值结果336valuations = []337if dcf.get('per_share_value'):338valuations.append(("DCF", dcf['per_share_value']))339if ddm.get('per_share_value'):340valuations.append(("DDM", ddm['per_share_value']))341if relative.get('comparison', {}).get('基于PE的合理价格'):342valuations.append(("相对估值", relative['comparison']['基于PE的合理价格']))343344if valuations:345avg_value = sum(v[1] for v in valuations) / len(valuations)346result["summary"]["平均内在价值"] = round(avg_value, 2)347result["summary"]["估值方法数"] = len(valuations)348result["summary"]["各方法估值"] = {k: round(v, 2) for k, v in valuations}349350# 安全边际计算351if current_price:352margin = self.calculate_margin_of_safety(353avg_value, current_price, margin_of_safety354)355result["summary"]["安全边际分析"] = margin356result["summary"]["当前价格"] = current_price357result["summary"]["建议买入价"] = margin.get('safety_price')358result["summary"]["投资结论"] = margin.get('conclusion')359360return result361362def _safe_float(self, value) -> Optional[float]:363"""安全转换为浮点数"""364if value is None or value == '' or value == '--':365return None366try:367if isinstance(value, str):368value = value.replace('%', '').replace(',', '').replace('亿', '')369return float(value)370except (ValueError, TypeError):371return None372373def _parse_shares(self, shares_str: str) -> Optional[float]:374"""解析股份数"""375if not shares_str:376return None377try:378if '亿' in str(shares_str):379return float(shares_str.replace('亿', '')) * 100000000380elif '万' in str(shares_str):381return float(shares_str.replace('万', '')) * 10000382else:383return float(shares_str)384except (ValueError, TypeError):385return None386387388def main():389parser = argparse.ArgumentParser(description="A股估值计算器")390parser.add_argument("--input", type=str, help="输入数据文件 (JSON)")391parser.add_argument("--code", type=str, help="股票代码 (如果不提供input)")392parser.add_argument("--methods", type=str, default="all",393help="估值方法: dcf/ddm/relative/all")394parser.add_argument("--discount-rate", type=float, default=10,395help="折现率 (%)")396parser.add_argument("--terminal-growth", type=float, default=3,397help="永续增长率 (%)")398parser.add_argument("--margin-of-safety", type=float, default=30,399help="安全边际要求 (%)")400parser.add_argument("--output", type=str, help="输出文件路径 (JSON)")401402args = parser.parse_args()403404calculator = ValuationCalculator()405406if args.input:407calculator.load_data(args.input)408elif args.code:409# 如果只提供代码,需要先获取数据410print("请先使用 data_fetcher.py 获取数据,然后使用 --input 参数")411sys.exit(1)412else:413print("请提供 --input 或 --code 参数")414sys.exit(1)415416# 执行估值417valuation_methods = {418"dcf": lambda: calculator.dcf_valuation(419discount_rate=args.discount_rate,420terminal_growth=args.terminal_growth421),422"ddm": calculator.ddm_valuation,423"relative": calculator.relative_valuation,424}425426if args.methods in valuation_methods:427result = valuation_methods[args.methods]()428else:429result = calculator.comprehensive_valuation(430discount_rate=args.discount_rate,431terminal_growth=args.terminal_growth,432margin_of_safety=args.margin_of_safety433)434435# 输出436output_json = json.dumps(result, ensure_ascii=False, indent=2, default=str)437438if args.output:439with open(args.output, 'w', encoding='utf-8') as f:440f.write(output_json)441print(f"估值结果已保存到: {args.output}")442else:443print(output_json)444445446if __name__ == "__main__":447main()448