PySpark时序数据描述

PySpark时序数据描述
为更好的洞察和处理大规模时序数据的特性,本文针对大规模时序数据,从基本统计特性,分布,序列内部检测三方面,提供Spark和借助numpy,scipy,statsmodels封装的成UDF函数脚本与理论讲解。不仅帮助了解序列本身的规律,也是机器学习建模的特征,同时,亦可作为分类分层建模的依据。因为是针对大规模时序数据,所以常规的画图可视化的方式检测其分布等特性并不合适,所以需要基于统计计算的方式给出确定的检验和描述。

一、基本统计特性

基本统计特性包括长度,均值,间断时长,标准差等,用于对序列有大致了解。

1.序列长度

针对时序数据,一般首先会关注序列的长度 n n n,较短序列意味着数据的潜在可变性无法判断,随机成分较大,无法通过该序列历史数据揭示出规律性。从理论角度来说,样本的数量要比预测模型中参数个数更多。从经典时间序列分解理论出发,序列短则无法准确的推断其趋势、季节波动,对ARIMA等模型,将导致其自相关系数,平稳性检验等变得相对不可靠,机器学习模型,同样需要较大的样本量来避免过拟合风险。针对短序列,应该选择使用需要参数更少更简单的模型,比如移动平均,规则模型,线性回归,同时减低模型复杂。如果待预测问题属于一个较大粒度,比如门店-月销售额的短序列预测,相比较在门店-天-sku预测上更容易预测,因为潜在可变性较小,样本量(序列长度)问题相对没有那么突出。

2.销售时长

销售时长,描述序列发生销售的时间跨度,销售时长较短的序列表明是最近上线的新品。

S a l e s D a y s = m a x ( X t ) − m i n ( X t ) SalesDays=max(X_{t})-min(X_{t}) SalesDays=max(Xt)min(Xt)

3.间断时长

中断时长,用来度量序列完整性,中断的时间点越少,序列越完整。

I n t e r m D a y s = 1 + m a x ( X t ) − m i n ( X t ) − n IntermDays=1+max(X_{t})-min(X_{t})-n IntermDays=1+max(Xt)min(Xt)n

4.缺失值占比

通过比例的方式度量销售中断(缺失)的严重程度
M i s s i n g R a t i o = n 1 + m a x ( X t ) − m i n ( X t ) MissingRatio=\frac{n}{1+max(X_{t})-min(X_{t})} MissingRatio=1+max(Xt)min(Xt)n

5.均值(mean)

序列的均值描述该序列发生销售大致量级。

x ˉ = ∑ i = 1 n x i n \bar{x}=\frac{\sum_{i=1}^{n} x_{i}}{n} xˉ=ni=1nxi

6.标准差(std)

用于度量序列的波动性,标准差越大,说明序列波动性越强。

S = ∑ i = 1 n ( x i − x ˉ ) 2 n − 1 S=\sqrt{\frac{\sum_{i=1}^{n}\left(x_{i}-\bar{x}\right)^{2}}{n-1}} S=n1i=1n(xixˉ)2

7.C.V系数

需要比较两组序列离散度的时,如果不同序列之间测量尺度相差大,或量纲不同,使用标准差来进行比较不是合理的方式,此时应该使用变异系数这一指标,在标准差的基础上除以原始数据平均数,消除测量尺度和量纲的影响。

C . V = S x ˉ × 100 % C . V=\frac{S}{\bar{x}} \times 100 \% C.V=xˉS×100%

二、分布特性

时序的分布特性包含,偏度,峰度和在此之上的Jarque-Bera测试,销售量 y y y的分布不总是连续正态分布,使用机器学习建模时,需要针对不同的分布设计不同的损失函数或者选择更合适的模型。

8.偏度(skewness)

偏度描述的是某总体取值分布的对称性,为样本的三阶标准矩。
Skew ⁡ ( X ) = E [ ( X − μ σ ) 3 ] 其 中 , μ : 样 本 均 值 , δ : 标 准 差 , E : 均 值 \operatorname{Skew}(X)=E\left[\left(\frac{X-\mu}{\sigma}\right)^{3}\right] \\ 其中,μ:样本均值,δ:标准差,E:均值 Skew(X)=E[(σXμ)3],μ:,δ:,E:
正态分布(偏度=0),右偏分布(正偏,偏度>0),左偏分布(负偏分布,偏度<0)

9.峰度(Kurtosis)

Kurt ⁡ [ X ] = E [ ( X − μ σ ) 4 ] 其 中 μ : 样 本 均 值 , δ : 标 准 差 , E : 取 均 值 操 作 \operatorname{Kurt}[X]=\mathrm{E}\left[\left(\frac{X-\mu}{\sigma}\right)^{4}\right] \\ 其中μ:样本均值,δ:标准差,E:取均值操作 Kurt[X]=E[(σXμ)4]μ:δ:E:

峰度是描述总体中所有取值分布形态陡缓程度的统计量,峰度定义为四阶标准矩。峰度为0表示该总体数据分布与正态分布的陡缓程度相同;峰度的绝对值数值越大表示其分布形态的陡缓程度与正态分布的差异程度越大。峰度大于0表示该总体数据分布与正态分布相比较为陡峭,为尖顶峰;峰度小于0表示该总体数据分布与正态分布相比较为平坦,为平顶峰。

10.雅克-贝拉检验(Jarque-Bera)

Jarque-Bera 是一种总体分布的正态性检验,当序列服从正态分布时,JB统计量:
J B = n ( S 2 6 + ( K − 3 ) 2 24 ) 其 中 : n 为 样 本 规 模 , S 、 K 分 别 为 偏 度 和 峰 度 ; 原 假 设 H 0 : 样 本 服 从 正 态 分 布 , 来 自 一 个 正 态 分 布 的 总 体 , 那 么 J B 检 验 渐 近 于 χ ( 2 ) 分 布 备 择 假 设 H 1 : 样 本 不 服 从 正 态 分 布 . J B=n\left(\frac{S^{2}}{6}+\frac{(K-3)^{2}}{24}\right) \\其中: n为样本规模,S、K分别为偏度和峰度; \\原假设 H0:样本服从正态分布,来自一个正态分布的总体,那么JB检验渐近于χ(2)分布 \\备择假设 H1:样本不服从正态分布. JB=n(6S2+24(K3)2)nSK;H0,JBχ(2)H1.

注:该检验可以通过python的如下两个模块进行

#Jarque-Bera test
Scipy.stats.jarque_bera
statsmodels.stats.stattools.jarque_bera

当然关于正态性检验的常用方式还有如Kolmogorov-Smirnor test(KS),Shapiro-Wilk test等,

由于spark.sql可以直接计算偏度和峰度,Jarque-Bera可以通过二者计算得到,所以考虑到便捷和通用性。所以这里使用Jarque-Bera,以上基本统计特性和分布特征的实例代码Spark.SQL如下:

select 
shop_id,
sku_id,
avg(sale_qty) as sale_avg,
stddev_samp(sale_qty) as sale_std,
stddev_samp(sale_qty)/avg(sale_qty) as cv_coef,
skewness(sale_qty) as sale_skew,
kurtosis(sale_qty) as sale_kurt,
min(dt) as start_sale,
max(dt) as end_sale,
count(dt) as sale_days,
floor(datediff(max(dt), min(dt)))+1 as sale_span,
(floor(datediff(max(dt), min(dt)))+1)  - count(dt) as intermit_day,
count(dt)/(floor(datediff(max(dt), min(dt)))+1) as sale_ratio,
count(dt)*(((skewness(sale_qty)*skewness(sale_qty))+((kurtosis(sale_qty)-3)*(kurtosis(sale_qty)-3))/4)/6) as item_jb
from app.dataset_input_df_v1
where sale_qty>0
group by shop_id,sku_id

三、序列内部特性

11.长期趋势

长期趋势通过排除季节性和循环变动以及无规则变动等因素的影响,描述时序的未来大致走向和基本趋势规律

计算长期趋势可以通过移动平均和线性方程拟合来实现,以下代码为线性回归拟合方法。

from scipy.stats import linregress

def long_trend(data):
    """
    :param data: DataFrame of one series
    :return: DataFrame of data index(key) info and regression coefficient as long_trend
    """
    slop=lambda x: linregress(list(range(1,data.shape[0]+1)),data.label.values)
    trend=slop(data)[0]
    return pd.DataFrame({'shop_id':df['shop_id'].iloc[0],'sku_id':df['sku_id'].iloc[0], 'long_trend': [trend]})

注:使用线性拟合的长期趋势是,其放入的一列特征是时间点的索引,比如[2021-01-01,2021-01-02,2021-01-03]

三天的序列,其计算长期趋势时,使用其排序后的1,2,3索引作为x。

12.平稳性

使用ARMA/ARIMA这样的经典时序分析手段,其前提假设是序列具有平稳性,因此,需要对数据或其n阶差分进行平稳检验,而一种常见的方法就是ADF检验,即单位根检验,判定其生成过程是否会随时间而变化。

平稳性分为强平稳和弱平稳,强平稳的要求非常严格,它要求两组数据之间的统计性质不会随着时间改变。其要求过于严苛,理论上很难证明,实际中难以检验,所以实际上一般使用弱平稳。
E ( Y t ) = μ V a r ( Y t ) = E ( ( Y t − μ ) ) 2 = σ 2 C o v ( Y t , Y t + k ) = E [ ( Y t − μ ) ( Y t + k − μ ) ] = γ k E(Yt)=μ\\ Var(Yt)=E((Yt−μ))2=σ 2 \\ Cov(Yt ,Yt+k)=E[(Yt−μ)(Yt+k−μ)]=γk E(Yt)=μVar(Yt)=E((Ytμ))2=σ2Cov(Yt,Yt+k)=E[(Ytμ)(Yt+kμ)]=γk
也就是

(1)随机变量的期望(均值)μ不随时间t改变;

(2)任意时刻二阶矩都存在;

(3)两个随机变量之间的自相关系数,只与这两个变量的时间间隔有关,而不随时间的推移而改变。

可调用以下函数来实现,详细的针对大规模序列检测,在文末给出。

from statsmodels.tsa.stattools import adfuller

13.周期性

不是所有的时序数据都具备明显的周期性,如果存在周期,则在分析建模的时需加入周期性这一特性。

常用的周期性函数通过正弦和余弦的函数来表示,假设f(x)是以2pi为周期的函数,那么傅里叶级数为:
s ( t ) = ∑ n = 1 N ( a n cos ⁡ ( 2 π n t P ) + b n sin ⁡ ( 2 π n t P ) ) 以 年 为 周 期 的 序 列 , p = 365 , n = 10 以 月 为 周 期 的 序 列 , p = 30 , n = 5 以 周 为 周 期 的 序 列 , p = 7 , n = 3 s(t)=\sum_{n=1}^{N}\left(a_{n} \cos \left(\frac{2 \pi n t}{P}\right)+b_{n} \sin \left(\frac{2 \pi n t}{P}\right)\right)\\ 以年为周期的序列,p=365,n=10\\ 以月为周期的序列,p=30,n=5\\ 以周为周期的序列,p=7,n=3\\ s(t)=n=1N(ancos(P2πnt)+bnsin(P2πnt)),p=365,n=10,p=30,n=5,p=7,n=3
也可以使用基于n阶自相关系数的方式来检测,自相关(autocorrelation or lagged correlation)用于评估时序数据是否依赖于其过去的数据。 Y t Yt Yt Y t + h Yt+h Yt+h之间的相关系数记为自相关系数ρ(h),其函数称为自相关函数(autocorrelation function, ACF)通过计算序列 [ t 1 , t 2 , t 3 , t 4 , . . . , t n ] [{t_{1},t_{2},t_{3},t_{4},...,t_{n}}] [t1,t2,t3,t4,...,tn] [ 1 , 2 , 3 , . . . n − 1 ] [1,2,3,...n-1] [1,2,3,...n1]次ACF自相关,其n-1个自相关系数排列的局部最值点的最小公倍数,可以作为周期性,如最小公倍数为类似于7,30,12,则可以作为其周期长度。如,n-1个自相关局部最值为[7,14,21,28],则可以认为其周期长度为7,可能在实际工作中数据本身的特点,周末销售高峰,本身周期性应该是7,本该是礼拜天出现的销售最高值,但某些时候可能是礼拜六,或者在这批序列中跨越了节假日这些不寻常的情况,可能出现的局部最大自相关系数的下标是[ 7, 13, 21, 28, 32],故实践中,如只要有一个下标能被周期7整除,则判断为具有周期性,具体代码细节请查看文末的代码。

14.序列复杂度

序列数据中潜在规律成分多少可以通过计算序列复杂度来度量,复杂度越低,时序内部的有序成分越多,也更具规律性。了解序列本身蕴含的稳定或者波动性,也便于解释某些商品预测为何比其他商品预测准确性更差。近年来有不少学者提出可以使用排列熵(permutation entropy)针对时间序列本身具有的空间信号特性进行检验,该方法计算简单,抗噪声能力强,对时间敏感度高,输出结果直观,应用范围广泛。理论方面论文和详细推导可以参考《Practical considerations of permutation entropy: A tutorial review》。

from math import factorial
import numpy as np
def _embed(x, order=3, delay=1):
    N = len(x)
    Y = np.empty((order, N - (order - 1) * delay))
    for i in range(order):
        Y[i] = x[i * delay:i * delay + Y.shape[1]]
    return Y.T

def permutation_entropy(time_series, order=3, delay=1, normalize=False):
    x = np.array(time_series)
    hashmult = np.power(order, np.arange(order))
    # Embed x and sort the order of permutations
    sorted_idx = _embed(x, order=order, delay=delay).argsort(kind='quicksort')
    # Associate unique integer to each permutations
    hashval = (np.multiply(sorted_idx, hashmult)).sum(1)
    # Return the counts
    _, c = np.unique(hashval, return_counts=True)
    # Use np.true_divide for Python 2 compatibility
    p = np.true_divide(c, c.sum())
    pe = -np.multiply(p, np.log2(p)).sum()#【3】
    if normalize:
        pe /= np.log2(factorial(order))
    return pe

借助Spark.SQL实现大规模时间序列的平稳性和周期性检验的代码如下:

import datetime
import numpy as np
import pandas as pd
import scipy.signal as signal
from statsmodels.tsa.stattools import acf
from statsmodels.tsa.stattools import adfuller
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import SparkConf
from pyspark.sql.functions import pandas_udf, PandasUDFType

schema_adfuller = StructType([
    StructField("shop_number", StringType()),
    StructField("item_number", StringType()),
    StructField("adfuller", IntegerType())])


@pandas_udf(schema_adfuller, functionType=PandasUDFType.GROUPED_MAP)
def adf_test(df):
    df.sort_values(by=['date'],ascending=[True],inplace=True)
    adf_values=adfuller(df['qty'],autolag='AIC')[1]
    is_adf_func = lambda x: 1 if x < 0.05 else 0
    is_adf = is_adf_func(adf_values)
    result=pd.DataFrame({'store_id':df['store_id'].iloc[0],'is_adf':[is_adf]})
    return result

schema_cycle = StructType([
    StructField("shop_number", StringType()),
    StructField("item_number", StringType()),
    StructField("cycle", IntegerType())])

@pandas_udf(schema_cycle, functionType=PandasUDFType.GROUPED_MAP)
def cycle_test(df,check_len=7):

    """
     check series the cycle
    :param df: data of series
    :param check_len: will check cycle length,eg weekly:7,month:30,year:12
    :return:int,1 is exit cycle,other is 0
    """

    assert check_len <=2,('cycle length must >2')
    df = df.sort_values(by=['dt'], ascending=True)
    acf_values=acf(df['label'],nlags=df.shape[0]-1)
    loc_max_index=signal.argrelextrema(acf_values,comparator=np.greater,order=check_len//2)

    #7 is weekly cycle if month data series can choice 12
    #occur local max index in cycle for len, as be exit cycle
    cycle_check=[i for i in loc_max_index[0] if i%check_len==0]
    is_cycle=lambda  x : 1 if cycle_check else 0
    cycle_result=is_cycle(cycle_check)
    result = pd.DataFrame({'shop_number':df['shop_number'].iloc[0],'item_number':df['item_number'].iloc[0], 'cycle': [cycle_result]})
    return result


def model_input():
    data_sql = """select shop_id,sku_id,dt,label from temp.dataset_feature"""
    data = spark.sql(data_sql)
    return data

if __name__ == '__main__':
    data=model_input()
    data =data.na.fill(0)
    item_adfuller = data.groupby(['shop_id','sku_id']).apply(adf_test)
    item_adfuller.createOrReplaceTempView('item_adfuller')
    item_cycle = data.groupby(['shop_id','sku_id']).apply(cycle_test)
    try:
        feature_df_tab.write.mode("append").format('hive').saveAsTable('temp.item_adfuller_cycle_table')
    except:
        item_cycle.createOrReplaceTempView('item_cycle')
        spark.sql("""drop table if exists temp.item_adfuller_cycle_table""")
        spark.sql("""create table temp.item_adfuller_cycle_table as 
        select a.shop_id,a.sku_id,a.cycle,b.adfuller
        from item_cycle a 
        left join item_adfuller b
        on a.shop_id=b.shop_id and a.sku_id=b.sku_id""")

以上只是工程实践角度实现对序列描述和检验的方法,其中如中断时长乃至计算方式并不是一个严格意义上的学术称谓,或许还有更严谨和先进的方式,欢迎交流。

热门文章

暂无图片
编程学习 ·

Java输出数组的内容

Java输出数组的内容_一万个小时-CSDN博客_java打印数组内容1. 输出内容最常见的方式// List<String>类型的列表List<String> list new ArrayList<String>();list.add("First");list.add("Second");list.add("Third");list.ad…
暂无图片
编程学习 ·

母螳螂的“魅惑之术”

在它们对大蝗虫发起进攻的时候&#xff0c;我认认真真地观察了一次&#xff0c;因为它们突然像触电一样浑身痉挛起来&#xff0c;警觉地面对限前这个大家伙&#xff0c;然后放下自己优雅的身段和祈祷的双手&#xff0c;摆出了一个可怕的姿势。我被眼前的一幕吓到了&#xff0c;…
暂无图片
编程学习 ·

疯狂填词 mad_libs 第9章9.9.2

#win7 python3.7.0 import os,reos.chdir(d:\documents\program_language) file1open(.\疯狂填词_d9z9d2_r.txt) file2open(.\疯狂填词_d9z9d2_w.txt,w) words[ADJECTIVE,NOUN,VERB,NOUN] str1file1.read()#方法1 for word in words :word_replaceinput(fEnter a {word} :)str1…
暂无图片
编程学习 ·

HBASE 高可用

为了保证HBASE是高可用的,所依赖的HDFS和zookeeper也要是高可用的. 通过参数hbase.rootdir指定了连接到Hadoop的地址,mycluster表示为Hadoop的集群. HBASE本身的高可用很简单,只要在一个健康的集群其他节点通过命令 hbase-daemon.sh start master启动一个Hmaster进程,这个Hmast…
暂无图片
编程学习 ·

js事件操作语法

一、事件的绑定语法 语法形式1 事件监听 标签对象.addEventListener(click,function(){}); 语法形式2 on语法绑定 标签对象.onclick function(){} on语法是通过 等于赋值绑定的事件处理函数 , 等于赋值本质上执行的是覆盖赋值,后赋值的数据会覆盖之前存储的数据,也就是on…
暂无图片
编程学习 ·

Photoshop插件--晕影动态--选区--脚本开发--PS插件

文章目录1.插件界面2.关键代码2.1 选区2.2 动态晕影3.作者寄语PS是一款栅格图像编辑软件&#xff0c;具有许多强大的功能&#xff0c;本文演示如何通过脚本实现晕影动态和选区相关功能&#xff0c;展示从互联网收集而来的一个小插件&#xff0c;供大家学习交流&#xff0c;请勿…
暂无图片
编程学习 ·

vs LNK1104 无法打开文件“xxx.obj”

写在前面&#xff1a; 向大家推荐两本新书&#xff0c;《深度学习计算机视觉实战》和《学习OpenCV4&#xff1a;基于Python的算法实战》。 《深度学习计算机视觉实战》讲了计算机视觉理论基础&#xff0c;讲了案例项目&#xff0c;讲了模型部署&#xff0c;这些项目学会之后可以…
暂无图片
编程学习 ·

工业元宇宙的定义与实施路线图

工业元宇宙的定义与实施路线图 李正海 1 工业元宇宙 给大家做一个关于工业元宇宙的定义。对于工业&#xff0c;从设计的角度来讲&#xff0c;现在的设计人员已经做到了普遍的三维设计&#xff0c;但是进入元宇宙时代&#xff0c;就不仅仅只是三维设计了&#xff0c;我们的目…
暂无图片
编程学习 ·

【leectode 2022.1.15】完成一半题目

有 N 位扣友参加了微软与力扣举办了「以扣会友」线下活动。主办方提供了 2*N 道题目&#xff0c;整型数组 questions 中每个数字对应了每道题目所涉及的知识点类型。 若每位扣友选择不同的一题&#xff0c;请返回被选的 N 道题目至少包含多少种知识点类型。 示例 1&#xff1a…
暂无图片
编程学习 ·

js 面试题总结

一、js原型与原型链 1. prototype 每个函数都有一个prototype属性&#xff0c;被称为显示原型 2._ _proto_ _ 每个实例对象都会有_ _proto_ _属性,其被称为隐式原型 每一个实例对象的隐式原型_ _proto_ _属性指向自身构造函数的显式原型prototype 3. constructor 每个prot…
暂无图片
编程学习 ·

java练习代码

打印自定义行数的空心菱形练习代码如下 import java.util.Scanner; public class daYinLengXing{public static void main(String[] args) {System.out.println("请输入行数");Scanner myScanner new Scanner(System.in);int g myScanner.nextInt();int num g%2;//…
暂无图片
编程学习 ·

RocketMQ-什么是死信队列?怎么解决

目录 什么是死信队列 死信队列的特征 死信消息的处理 什么是死信队列 当一条消息初次消费失败&#xff0c;消息队列会自动进行消费重试&#xff1b;达到最大重试次数后&#xff0c;若消费依然失败&#xff0c;则表明消费者在正常情况下无法正确地消费该消息&#xff0c;此时…
暂无图片
编程学习 ·

项目 cg day04

第4章 lua、Canal实现广告缓存 学习目标 Lua介绍 Lua语法 输出、变量定义、数据类型、流程控制(if..)、循环操作、函数、表(数组)、模块OpenResty介绍(理解配置) 封装了Nginx&#xff0c;并且提供了Lua扩展&#xff0c;大大提升了Nginx对并发处理的能&#xff0c;10K-1000K Lu…
暂无图片
编程学习 ·

输出三角形

#include <stdio.h> int main() { int i,j; for(i0;i<5;i) { for(j0;j<i;j) { printf("*"); } printf("\n"); } }
暂无图片
编程学习 ·

stm32的BOOTLOADER学习1

序言 最近计划学习stm32的BOOTLOADER学习,把学习过程记录下来 因为现在网上STM32C8T6还是比较贵的,根据我的需求flash空间小一些也可以,所以我决定使用stm32c6t6.这个芯片的空间是32kb的。 #熟悉芯片内部的空间地址 1、flash ROM&#xff1a; 大小32KB&#xff0c;范围&#xf…
暂无图片
编程学习 ·

通过awk和shell来限制IP多次访问之学不会你打死我

学不会你打死我 今天我们用shell脚本&#xff0c;awk工具来分析日志来判断是否存在扫描器来进行破解网站密码——限制访问次数过多的IP地址&#xff0c;通过Iptables来进行限制。代码在末尾 首先我们要先查看日志的格式&#xff0c;分析出我们需要筛选的内容&#xff0c;日志…
暂无图片
编程学习 ·

Python - 如何像程序员一样思考

在为计算机编写程序之前&#xff0c;您必须学会如何像程序员一样思考。学习像程序员一样思考对任何学生都很有价值。以下步骤可帮助任何人学习编码并了解计算机科学的价值——即使他们不打算成为计算机科学家。 顾名思义&#xff0c;Python经常被想要学习编程的人用作第一语言…
暂无图片
编程学习 ·

蓝桥杯python-数字三角形

问题描述 虽然我前后用了三种做法&#xff0c;但是我发现只有“优化思路_1”可以通过蓝桥杯官网中的测评&#xff0c;但是如果用c/c的话&#xff0c;每个都通得过&#xff0c;足以可见python的效率之低&#xff08;但耐不住人家好用啊&#xff08;哭笑&#xff09;&#xff09…