技术

IT这个行当之需求与时间管理 golang结构体和包中的类型或基础类型定义方法 golang中结构体的初始化方法(new方法) 项目管理总结 python函数式编程之-装饰器(Decorators) python文件批量处理 Go,互联网时代的C Python推导式演变(Comprehensions) 项目管理感悟 golang学习简单例子 了解GitHub工作流【译】 PHP Socket的使用 Apache 日志文件格式及简单处理 Python脚本--下载合并SAE日志 PHP命名空间及自动加载 基于CSS3实现尖角面包屑 部署Ceilometer到已有环境中 OpenStack Ceilometer Collector代码解读 OpenStack Ceilometer数据存储与API源码解析 OpenStack Ceilometer中的Pipeline机制 OpenStack Ceilometer Compute Agent源码解读 学习Python动态扩展包stevedore 学习Python的ABC模块 Python包管理工具setuptools详解 OpenStack Horizon 中文本地化 WSGI学习 在虚拟机单机部署OpenStack Grizzly 学习使用python打包工具distutils python包工具之间的关系 给OpenStack创建Ubuntu镜像 OpenStack Grizzly Multihost部署文档 为什么使用pip而不是easy_install HTML中meta标签viewpoint的作用 交互式编程-IPython 页面提速之——数据缓存 给OpenStack创建Win7镜像 Ceilometer的命令行使用 部署一个ceilometer-horizon项目 给OpenStack创建Windows XP镜像 几种企业的存储系统 概念模型、逻辑模型、物理模型的区别 五中常见的开源协议整理(BSD,Apache,GPL,LGPL,MIT) OpenStack监控项目Ceilometer的一些术语 VNC和远程桌面的区别 OpenStack Ceilometer项目简介 虚拟化与云计算中KVM,Xen,Qemu的区别和联系 调试和修改OpenStack中的Horizon部分 JavaScript变量作用域 kanyun worker原理 kanyun server服务 在OpenStack中部署kanyun kanyun的api-client命令 sae下的python开发部署和一个简单例子 OpenStack Nova内部机制 PHP可变变量 JS中防止浏览器屏蔽window.open PHP操作Session的原理及提升安全性时的一个问题

标签


OpenStack Ceilometer中的Pipeline机制

2013年06月11日

Pipeline作用

Pipeline翻译过来是管道的意思,它在ceilometer中的作用类似一个过滤器一样,或者说是转换器。它是一般是一个方法链,这个方法链前面一部分是transformer,transformer实现数据转换等功能,它可以有多个。在链尾是publisher,它负责将数据发送到AMQP中去。

Pipeline定义

在Agent的构造函数中,第一个创建的属性就是pipeline_manager

self.pipeline_manager = pipeline.setup_pipeline(
    transformer.TransformerExtensionManager(
        'ceilometer.transformer',
    ),
    publisher.PublisherExtensionManager(
        'ceilometer.publisher',
    ),
)

其中,transformer和publisher来自setup.cfg中

ceilometer.transformer =
    accumulator = ceilometer.transformer.accumulator:TransformerAccumulator

ceilometer.publisher =
    meter_publisher = ceilometer.publisher.meter:MeterPublisher
    meter = ceilometer.publisher.meter:MeterPublisher
    udp = ceilometer.publisher.udp:UDPPublisher

Pipeline设置

它调用了ceilometer.pipeline中的setup_pipline(),setup_pipeline()通过导入pipeline.yaml,获得pipeline的配置,默认配置如下

name: meter_pipeline
interval: 600
counters:
    - "*"
transformers:
publishers:
    - meter

最后它创建了一个PipelineManager给self.pipeline_manager

PipelineManager(pipeline_cfg,transformer_manager,publisher_manager)

PipelineManager做的事情如下:

self.pipelines = [Pipeline(pipedef, publisher_manager,transformer_manager) for pipedef in cfg]

它遍历cfg中对pipeline的定义(基本都是一个),然后生成一个Pipeline对象数组

def __init__(self, cfg, publisher_manager, transformer_manager):
    self.cfg = cfg
    self.name = cfg['name']
    self.interval = int(cfg['interval'])
    self.counters = cfg['counters']
    self.publishers = cfg['publishers']
    self.transformer_cfg = cfg['transformers'] or []
    self.publisher_manager = publisher_manager
    self._check_counters()
    self._check_publishers(cfg, publisher_manager)
    self.transformers = self._setup_transformers(cfg, transformer_manager)

Pipeline的构造函数如上,它的作用是处理transformer和publisher

Pipeline使用

pipeline的使用位置在agent.py中

def setup_polling_tasks(self):
    polling_tasks = {}
    for pipeline, pollster in itertools.product(
            self.pipeline_manager.pipelines,
            self.pollster_manager.extensions):
        for counter in pollster.obj.get_counter_names():
            if pipeline.support_counter(counter):
                polling_task = polling_tasks.get(pipeline.interval, None)
                if not polling_task:
                    polling_task = self.create_polling_task()
                    polling_tasks[pipeline.interval] = polling_task
                polling_task.add(pollster, [pipeline])
                break

    return polling_tasks

首先通过product生成pipeline和pollster的笛卡尔积,即将每一个pollster都和pipeline配对(一般只有一个pipeline)。

pipeline.support_counter(counter)用来检查这个counter是否同意进入pipeline

另外,每一个polling_task都在构造函数中

self.publish_context = pipeline.PublishContext(
    agent_manager.context,
    cfg.CONF.counter_source)

声明了一个pipeline.PublishContext()

在执行task.poll_and_publish前,会先执行

def add(self, pollster, pipelines):
    self.publish_context.add_pipelines(pipelines)
    self.pollsters.update([pollster])

即增加一个pipeline管理

最后是publish_context的使用位置

def poll_and_publish_instances(self, instances):
    with self.publish_context as publisher:
        for instance in instances:
            if getattr(instance, 'OS-EXT-STS:vm_state', None) != 'error':
                for pollster in self.pollsters:
                    publisher(list(pollster.obj.get_counters(
                        self.manager,
                        instance)))

这里用了with as作为pipeline的管理

__enter__()中,定义了一个函数

def p(counters):
    for p in self.pipelines:
        p.publish_counters(self.context,
                           counters,
                           self.source)

这个函数执行pipeline中的publish_counters,然后最终的执行代码来自

ext.obj.publish_counters(ctxt, counters, source)

即publisher的publishcounters,在这里是`ceilometer.publisher.meter:publishcounters`,它负责将数据发送到AMQP中去

总结

Pipeline机制一定程度上保证了数据的安全性,并且可以统一数据格式,了解它对于了解Ceilometer的数据流有一定帮助