2012-03-30 13 views
11

¿Podría alguien señalarme en la dirección correcta con respecto a la conversión de marco de tiempo de datos de OHLC con Pandas? Lo que trato de hacer es construir un Dataframe con datos para plazos más largos, dados los datos con un marco de tiempo más bajo.Conversión de datos de stock de OHLC en un marco de tiempo diferente con python y pandas

Por ejemplo, dado que tengo la (M1) siguientes datos de un minuto:

     Open High  Low Close Volume 
Date              
1999-01-04 10:22:00 1.1801 1.1819 1.1801 1.1817  4 
1999-01-04 10:23:00 1.1817 1.1818 1.1804 1.1814  18 
1999-01-04 10:24:00 1.1817 1.1817 1.1802 1.1806  12 
1999-01-04 10:25:00 1.1807 1.1815 1.1795 1.1808  26 
1999-01-04 10:26:00 1.1803 1.1806 1.1790 1.1806  4 
1999-01-04 10:27:00 1.1801 1.1801 1.1779 1.1786  23 
1999-01-04 10:28:00 1.1795 1.1801 1.1776 1.1788  28 
1999-01-04 10:29:00 1.1793 1.1795 1.1782 1.1789  10 
1999-01-04 10:31:00 1.1780 1.1792 1.1776 1.1792  12 
1999-01-04 10:32:00 1.1788 1.1792 1.1788 1.1791  4 

que tiene valores de volumen abierto, alto, bajo, Close (OHLC) y por cada minuto me gustaría construir un conjunto de lecturas de 5 minutos (M5), que se vería así:

     Open High  Low Close Volume 
Date              
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789  91 
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791  16 

lo tanto, el flujo de trabajo es que:

  • abierto es el Abierto de t por primera fila de la timewindow
  • alta es la más alta de alta en el timewindow
  • baja es la más baja baja
  • Close es el último Cerrar
  • volumen es simplemente una suma de volúmenes

Hay son algunos problemas sin embargo:

  • los datos han huecos (nota no hay ninguna fila 10:30:00)
  • los intervalos de 5 minutos tienen que comenzar en el tiempo de ronda, p. M5 comienza a las 10:25:00 No 10:22:00
  • primer set, incompleta puede omitirse como en este ejemplo, o incluido (para que pudiéramos tener 10:20:00 entrada 5 minutos)

El Pandas documentation on up-down sampling da un ejemplo, pero usan el valor medio como el valor de la fila muestreada arriba, que no funcionará aquí. He intentado usar groupby y agg, pero fue en vano. Para obtener el más alto, el más bajo y el más bajo, puede que no sea tan difícil, pero no tengo idea de cómo obtener el primer cierre y el último cierre.

Lo que intenté decir algo en la línea de:

grouped = slice.groupby(dr5minute.asof).agg( 
    { 'Low': lambda x : x.min()[ 'Low' ], 'High': lambda x : x.max()[ 'High' ] } 
) 

pero da lugar a error de seguimiento, lo que no entiendo:

In [27]: grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 
--------------------------------------------------------------------------- 
IndexError        Traceback (most recent call last) 
/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <module>() 
----> 1 grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in agg(self, func, *args, **kwargs) 
    242   See docstring for aggregate 
    243   """ 
--> 244   return self.aggregate(func, *args, **kwargs) 
    245 
    246  def _iterate_slices(self): 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, arg, *args, **kwargs) 
    1153      colg = SeriesGroupBy(obj[col], column=col, 
    1154           grouper=self.grouper) 
-> 1155      result[col] = colg.aggregate(func) 
    1156 
    1157    result = DataFrame(result) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in aggregate(self, func_or_funcs, *args, **kwargs) 
    906     return self._python_agg_general(func_or_funcs, *args, **kwargs) 
    907    except Exception: 
--> 908     result = self._aggregate_named(func_or_funcs, *args, **kwargs) 
    909 
    910    index = Index(sorted(result), name=self.grouper.names[0]) 

/usr/lib/python2.7/site-packages/pandas/core/groupby.pyc in _aggregate_named(self, func, *args, **kwargs) 
    976    grp = self.get_group(name) 
    977    grp.name = name 
--> 978    output = func(grp, *args, **kwargs) 
    979    if isinstance(output, np.ndarray): 
    980     raise Exception('Must produce aggregated value') 

/work/python/fxcruncher/<ipython-input-27-df50f9522a2f> in <lambda>(x) 
----> 1 grouped = slice.groupby(dr5minute.asof).agg({ 'Low' : lambda x : x.min()[ 'Low' ], 'High' : lambda x : x.max()[ 'High' ] }) 

IndexError: invalid index to scalar variable. 

Así que cualquier ayuda en hacer eso sería apreciado enormemente. Si el camino que elegí no va a funcionar, sugiera otro enfoque relativamente eficiente (tengo millones de filas). Algunos recursos sobre el uso de Pandas para el procesamiento financiero también serían buenos.

+2

¿Qué versión de pandas estás usando? Estamos trabajando en una serie de series de tiempo renovadas que simplificarán enormemente este proceso, pero es probable que no se lance hasta fines de abril. Sin embargo, también puede haber un error que reparar aquí –

+1

Hola Wes, estoy usando 0.7.2. Supongo que esperar a que se publique una nueva versión es una opción factible ya que no tengo una fecha límite para esta transformación (necesito los datos para la investigación privada). Sin embargo, ¡déjame aprovechar la ocasión para agradecerte por esforzarte en desarrollar pandas! :) – kgr

+0

Y en cuanto al posible error, tenga en cuenta que no especifiqué valores para todas las columnas en el Dataframe (solo 2 de 5), si eso es lo que quería decir. – kgr

Respuesta

8

Su enfoque es correcto, pero falla porque cada función en el dict-of-functions aplicado a agg() recibe un objeto Serie que refleja la columna que coincide con el valor de la clave. Por lo tanto, no es necesario volver a filtrar en la etiqueta de la columna. Con esto, y asumiendo groupby conserva orden, puede cortar la serie para extraer el primer/último elemento de las columnas Abrir/Cerrar (nota: groupby documentación no pretende mantener el orden de los datos originales series, pero parece en práctica.)

In [50]: df.groupby(dr5minute.asof).agg({'Low': lambda s: s.min(), 
             'High': lambda s: s.max(), 
             'Open': lambda s: s[0], 
             'Close': lambda s: s[-1], 
             'Volume': lambda s: s.sum()}) 
Out[50]: 
         Close High  Low Open Volume 
key_0              
1999-01-04 10:20:00 1.1806 1.1819 1.1801 1.1801  34 
1999-01-04 10:25:00 1.1789 1.1815 1.1776 1.1807  91 
1999-01-04 10:30:00 1.1791 1.1792 1.1776 1.1780  16 

Para referencia, aquí es una tabla para resumir los esperados tipos de entrada y de salida de una función de agregación basadas en el tipo de objeto GroupBy y cómo la función (s) de agregación es/son pasados ​​a agg() .

    agg() method  agg func agg func   agg() 
        input type  accepts  returns   result 
GroupBy Object 
SeriesGroupBy  function   Series  value    Series 
        dict-of-funcs Series  value    DataFrame, columns match dict keys 
        list-of-funcs Series  value    DataFrame, columns match func names 
DataFrameGroupBy function   DataFrame Series/dict/ary DataFrame, columns match original DataFrame 
        dict-of-funcs Series  value    DataFrame, columns match dict keys, where dict keys must be columns in original DataFrame 
        list-of-funcs Series  value    DataFrame, MultiIndex columns (original cols x func names) 

De la tabla anterior, si la agregación requiere el acceso a más de un columna, la única opción es pasar una única función a un objeto DataFrameGroupBy . Por lo tanto, una forma alternativa de realizar la tarea original es definir una función como la siguiente:

def ohlcsum(df): 
    df = df.sort() 
    return { 
     'Open': df['Open'][0], 
     'High': df['High'].max(), 
     'Low': df['Low'].min(), 
     'Close': df['Close'][-1], 
     'Volume': df['Volume'].sum() 
     } 

y aplicar agg() con ella:

In [30]: df.groupby(dr5minute.asof).agg(ohlcsum) 
Out[30]: 
         Open High  Low Close Volume 
key_0              
1999-01-04 10:20:00 1.1801 1.1819 1.1801 1.1806  34 
1999-01-04 10:25:00 1.1807 1.1815 1.1776 1.1789  91 
1999-01-04 10:30:00 1.1780 1.1792 1.1776 1.1791  16 

Aunque los pandas pueden ofrecer un poco de limpiador incorporado en magia en el futuro, con suerte esto explica cómo trabajar con las capacidades agg() de hoy.

+0

Antes que nada, gracias por esta respuesta muy informativa :) ¿Podrías escribir qué versión de Pandas estás usando y también quizás cómo creaste el 'dr5minute'? Parece que tengo un problema con 'groupby (dr5minute.asof)', simplemente devuelve solo un grupo. – kgr

+0

Supongo que el problema podría ser con el índice incorrecto. Creo que las fechas del CSV no se analizan correctamente en las fechas ... pero esa es otra historia, por lo que no es necesario debatir en los comentarios. Gracias de nuevo @crewburm! – kgr

+0

De nada, @kgr. Estoy usando 0.7.2. Para interpretar las fechas en un csv, consulte el argumento '' converters'' de '' read_csv() ''. – Garrett

7

sólo para ser de utilidad para otros usuarios con una versión más reciente de los pandas, hay un método volver a muestrear muy rápido y útil para llevar a cabo la misma tarea:

ohlc_dict = {                            
'Open':'first',                          
'High':'max',                          
'Low':'min',                           
'Close': 'last',                          
'Volume': 'sum' 
} 

df.resample('5T', how=ohlc_dict, closed='left', label='left') 
+0

Aparece esta advertencia 'FutureWarning: cómo en .resample() está en desuso la nueva sintaxis es .resample (...) .. apply ()' ¿Cómo debe ser después de la depreciación? – RaduS

+1

@RaduS, '' 'df.resample ('5T', closed = 'left', label = 'left'). Apply (ohlc_dict)' '' – wombatonfire

0

Dentro de mi principal() función que estoy recibiendo la transmisión de datos bid/ask. a continuación, hago lo siguiente:

df = pd.DataFrame([]) 

for msg_type, msg in response.parts(): 
    if msg_type == "pricing.Price": 
     sd = StreamingData(datetime.now(),instrument_string(msg), 
          mid_string(msg),account_api,account_id, 
          's','5min',balance) 
     df = df.append(sd.df()) 
     sd.resample(df) 

creé una clase StreamingData() que toma la entrada proporcionada (también creado algunas funciones para romper la oferta/demanda de datos en componentes individuales (oferta, demanda, mediados, instrumentos, etc.).

La belleza de esto es todo lo que tiene que hacer es cambiar los 's' y'5 minutos' a lo plazos desea. Ajústelo en 'm' y 'D' para obtener los precios diarios por minuto.

Esto es lo que mi StreamingData() parece:

class StreamingData(object): 
def __init__(self, time, instrument, mid, api, _id, xsec, xmin, balance): 
    self.time = time 
    self.instrument = instrument 
    self.mid = mid 
    self.api = api 
    self._id = _id 
    self.xsec = xsec 
    self.xmin = xmin 
    self.balance = balance 
    self.data = self.resample(self.df()) 

def df(self): 
    df1 = pd.DataFrame({'Time':[self.time]}) 
    df2 = pd.DataFrame({'Mid':[float(self.mid)]}) 
    df3 = pd.concat([df1,df2],axis=1,join='inner') 
    df = df3.set_index(['Time']) 
    df.index = pd.to_datetime(df.index,unit='s') 
    return df 

def resample(self, df): 
    xx = df.to_period(freq=self.xsec) 
    openCol = xx.resample(self.xmin).first() 
    highCol = xx.resample(self.xmin).max() 
    lowCol = xx.resample(self.xmin).min() 
    closeCol = xx.resample(self.xmin).last() 
    self.data = pd.concat([openCol,highCol,lowCol,closeCol], 
          axis=1,join='inner') 
    self.data['Open'] = openCol.round(5) 
    self.data['High'] = highCol.round(5) 
    self.data['Low'] = lowCol.round(5) 
    self.data['Close'] = closeCol.round(5) 
    return self.data 

Así que se necesita en los datos de StreamingData(), crea un tiempo de trama de datos indexada dentro df(), lo anexa, a continuación, envía hasta remuestreo(). Los precios que calculo están basados ​​en: mid = (bid + ask)/2

Cuestiones relacionadas