使用 hadoop api 获取任务日志

本贴最后更新于 1457 天前,其中的信息可能已经事过景迁

正常情况下我们可以通过开启日志聚合在 yarn webUi 上查看任务日志,但是当我们需要定制日志呈现方式时就需要使用到 hadoop 提供的 api 来获取。以下为 demo。

引入依赖

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.2</version>
        </dependency>

代码实现:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public class HadoopLogUtils {
    private static Configuration yarnConfiguration;
    private static LogCLIHelpers logCLIHelpers;
    private static final Logger LOGGER = LoggerFactory.getLogger(HadoopLogUtils.class);

    /**
     * 初始化配置
     */
    static {
        yarnConfiguration = new YarnConfiguration();

        yarnConfiguration.addResource("core-site.xml");
        yarnConfiguration.addResource("hdfs-site.xml");
        yarnConfiguration.addResource("yarn-site.xml");
        logCLIHelpers = new LogCLIHelpers();
        logCLIHelpers.setConf(yarnConfiguration);
    }

    public static Configuration getYarnConfiguration() {
        return yarnConfiguration;
    }

    /**
     * 获取容器日志
     *
     * @param appId
     * @param containerId
     * @param nodeId
     * @param jobOwner
     * @param out
     * @return
     * @throws IOException
     */
    public static int dumpAContainersLogs(String appId, String containerId, String nodeId, String jobOwner, PrintStream out, List<String> logType) throws IOException {
        Path remoteRootLogDir = new Path(getYarnConfiguration().get("yarn.nodemanager.remote-app-log-dir", "/tmp/logs"));

        String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getYarnConfiguration());
        Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner, suffix);

        RemoteIterator nodeFiles;
        try {
            Path qualifiedLogDir = FileContext.getFileContext(getYarnConfiguration()).makeQualified(remoteAppLogDir);
            nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(), getYarnConfiguration()).listStatus(remoteAppLogDir);
        } catch (FileNotFoundException var16) {
            logDirNotExist(remoteAppLogDir.toString());
            return -1;
        }

        boolean foundContainerLogs = false;

        while (nodeFiles.hasNext()) {
            FileStatus thisNodeFile = (FileStatus) nodeFiles.next();
            String fileName = thisNodeFile.getPath().getName();
            if (fileName.contains(LogAggregationUtils.getNodeString(nodeId)) && !fileName.endsWith(".tmp")) {
                AggregatedLogFormat.LogReader reader = null;

                try {
                    reader = new AggregatedLogFormat.LogReader(getYarnConfiguration(), thisNodeFile.getPath());
                    if (dumpAContainerLogs(containerId, reader, out, thisNodeFile.getModificationTime(), logType) > -1) {
                        foundContainerLogs = true;
                    }
                } finally {
                    if (reader != null) {
                        reader.close();
                    }

                }
            }
        }

        if (!foundContainerLogs) {
            containerLogNotFound(containerId);
            return -1;
        } else {
            return 0;
        }
    }

    private static void logDirNotExist(String remoteAppLogDir) {
        System.out.println(remoteAppLogDir + " does not exist.");
        System.out.println("Log aggregation has not completed or is not enabled.");
    }

    private static void containerLogNotFound(String containerId) {
        System.out.println("Logs for container " + containerId + " are not present in this log-file.");
    }

    public static int dumpAContainerLogs(String containerIdStr, AggregatedLogFormat.LogReader reader, PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
        AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();

        DataInputStream valueStream;
        for (valueStream = reader.next(key); valueStream != null && !key.toString().equals(containerIdStr); valueStream = reader.next(key)) {
            key = new AggregatedLogFormat.LogKey();
        }

        if (valueStream == null) {
            return -1;
        } else {
            boolean foundContainerLogs = false;

            while(true) {
                try {
                    readContainerLogs(valueStream, out, logUploadedTime, logType);
                    foundContainerLogs = true;
                } catch (EOFException var10) {
                    if (foundContainerLogs) {
                        return 0;
                    }

                    return -1;
                }
            }
        }
    }


    /**
     * 获取Containe nodeId列表
     *
     * @param appId
     * @param appOwner
     * @return
     * @throws IOException
     */
    public static Map<String, String> getContaines(String appId, String appOwner) throws IOException {
        Path remoteRootLogDir = new Path(yarnConfiguration.get(
                YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
                YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
        String user = appOwner;
        String logDirSuffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(yarnConfiguration);
        // TODO Change this to get a list of files from the LAS.
        Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
                remoteRootLogDir, ConverterUtils.toApplicationId(appId), user, logDirSuffix);
        RemoteIterator<FileStatus> nodeFiles;
        Map<String, String> containerAndNodeId = new LinkedHashMap<>();
        try {
            Path qualifiedLogDir =
                    FileContext.getFileContext(yarnConfiguration).makeQualified(remoteAppLogDir);
            nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
                    yarnConfiguration).listStatus(remoteAppLogDir);
        } catch (FileNotFoundException fnf) {
            logDirNotExist(remoteAppLogDir.toString());
            return Collections.emptyMap();
        }
        boolean foundAnyLogs = false;
        while (nodeFiles.hasNext()) {
            FileStatus thisNodeFile = nodeFiles.next();
            if (!thisNodeFile.getPath().getName()
                    .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
                AggregatedLogFormat.LogReader reader =
                        new AggregatedLogFormat.LogReader(yarnConfiguration, thisNodeFile.getPath());
                try {

                    DataInputStream valueStream;
                    AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey();
                    valueStream = reader.next(key);

                    while (valueStream != null) {
                        // Container: container_1587284642166_0001_01_000003 on master_42757
                        containerAndNodeId.put(key.toString(), thisNodeFile.getPath().getName().replace("_", ":"));

                        foundAnyLogs = true;
                        // Next container
                        key = new AggregatedLogFormat.LogKey();
                        valueStream = reader.next(key);
                    }
                } finally {
                    reader.close();
                }
            }
        }
        if (!foundAnyLogs) {
            emptyLogDir(remoteAppLogDir.toString());
            return Collections.emptyMap();
        }
        return containerAndNodeId;
    }


    private static void emptyLogDir(String remoteAppLogDir) {
        System.out.println(remoteAppLogDir + " does not have any log files.");
    }


    private static void readContainerLogs(DataInputStream valueStream,
                                          PrintStream out, long logUploadedTime, List<String> logType) throws IOException {
        byte[] buf = new byte[65535];

        String fileType = valueStream.readUTF();
        //if (logType.contains(fileType)) {
            String fileLengthStr = valueStream.readUTF();
            long fileLength = Long.parseLong(fileLengthStr);
            out.print("LogType:");
            out.println(fileType);
            if (logUploadedTime != -1) {
                out.print("Log Upload Time:");
                out.println(Times.format(logUploadedTime));
            }
            out.print("LogLength:");
            out.println(fileLengthStr);
            out.println("Log Contents:");

            long curRead = 0;
            long pendingRead = fileLength - curRead;
            int toRead =
                    pendingRead > buf.length ? buf.length : (int) pendingRead;
            int len = valueStream.read(buf, 0, toRead);
            while (len != -1 && curRead < fileLength) {
                out.write(buf, 0, len);
                curRead += len;

                pendingRead = fileLength - curRead;
                toRead =
                        pendingRead > buf.length ? buf.length : (int) pendingRead;
                len = valueStream.read(buf, 0, toRead);
            }
            out.println("End of LogType:" + fileType);
            out.println("");
       // }
    }

    /**
     * covert appId
     * @param appId
     * @return
     */
    public static ApplicationId convert(String appId) {
        return ConverterUtils.toApplicationId(appId);
    }


}

实现效果:

image20200428221031425.pngimage20200428221152282.png

源码地址: https://github.com/arrayMi/hadoop_learning

  • Hadoop

    Hadoop 是由 Apache 基金会所开发的一个分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

    82 引用 • 122 回帖 • 618 关注
  • Yarn
    10 引用 • 5 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...

推荐标签 标签

  • Sandbox

    如果帖子标签含有 Sandbox ,则该帖子会被视为“测试帖”,主要用于测试社区功能,排查 bug 等,该标签下内容不定期进行清理。

    369 引用 • 1212 回帖 • 581 关注
  • JavaScript

    JavaScript 一种动态类型、弱类型、基于原型的直译式脚本语言,内置支持类型。它的解释器被称为 JavaScript 引擎,为浏览器的一部分,广泛用于客户端的脚本语言,最早是在 HTML 网页上使用,用来给 HTML 网页增加动态功能。

    710 引用 • 1173 回帖 • 173 关注
  • Windows

    Microsoft Windows 是美国微软公司研发的一套操作系统,它问世于 1985 年,起初仅仅是 Microsoft-DOS 模拟环境,后续的系统版本由于微软不断的更新升级,不但易用,也慢慢的成为家家户户人们最喜爱的操作系统。

    215 引用 • 462 回帖
  • Unity

    Unity 是由 Unity Technologies 开发的一个让开发者可以轻松创建诸如 2D、3D 多平台的综合型游戏开发工具,是一个全面整合的专业游戏引擎。

    25 引用 • 7 回帖 • 247 关注
  • Hexo

    Hexo 是一款快速、简洁且高效的博客框架,使用 Node.js 编写。

    21 引用 • 140 回帖 • 25 关注
  • 开源

    Open Source, Open Mind, Open Sight, Open Future!

    396 引用 • 3416 回帖
  • Log4j

    Log4j 是 Apache 开源的一款使用广泛的 Java 日志组件。

    20 引用 • 18 回帖 • 43 关注
  • webpack

    webpack 是一个用于前端开发的模块加载器和打包工具,它能把各种资源,例如 JS、CSS(less/sass)、图片等都作为模块来使用和处理。

    41 引用 • 130 回帖 • 294 关注
  • GitBook

    GitBook 使您的团队可以轻松编写和维护高质量的文档。 分享知识,提高团队的工作效率,让用户满意。

    3 引用 • 8 回帖 • 1 关注
  • 服务器

    服务器,也称伺服器,是提供计算服务的设备。由于服务器需要响应服务请求,并进行处理,因此一般来说服务器应具备承担服务并且保障服务的能力。

    124 引用 • 580 回帖
  • 周末

    星期六到星期天晚,实行五天工作制后,指每周的最后两天。再过几年可能就是三天了。

    14 引用 • 297 回帖 • 1 关注
  • 知乎

    知乎是网络问答社区,连接各行各业的用户。用户分享着彼此的知识、经验和见解,为中文互联网源源不断地提供多种多样的信息。

    10 引用 • 66 回帖 • 1 关注
  • SQLite

    SQLite 是一个进程内的库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是全世界使用最为广泛的数据库引擎。

    4 引用 • 7 回帖 • 4 关注
  • Wide

    Wide 是一款基于 Web 的 Go 语言 IDE。通过浏览器就可以进行 Go 开发,并有代码自动完成、查看表达式、编译反馈、Lint、实时结果输出等功能。

    欢迎访问我们运维的实例: https://wide.b3log.org

    30 引用 • 218 回帖 • 605 关注
  • 机器学习

    机器学习(Machine Learning)是一门多领域交叉学科,涉及概率论、统计学、逼近论、凸分析、算法复杂度理论等多门学科。专门研究计算机怎样模拟或实现人类的学习行为,以获取新的知识或技能,重新组织已有的知识结构使之不断改善自身的性能。

    76 引用 • 37 回帖
  • InfluxDB

    InfluxDB 是一个开源的没有外部依赖的时间序列数据库。适用于记录度量,事件及实时分析。

    2 引用 • 55 关注
  • jsDelivr

    jsDelivr 是一个开源的 CDN 服务,可为 npm 包、GitHub 仓库提供免费、快速并且可靠的全球 CDN 加速服务。

    5 引用 • 31 回帖 • 44 关注
  • 笔记

    好记性不如烂笔头。

    303 引用 • 777 回帖
  • SendCloud

    SendCloud 由搜狐武汉研发中心孵化的项目,是致力于为开发者提供高质量的触发邮件服务的云端邮件发送平台,为开发者提供便利的 API 接口来调用服务,让邮件准确迅速到达用户收件箱并获得强大的追踪数据。

    2 引用 • 8 回帖 • 439 关注
  • 酷鸟浏览器

    安全 · 稳定 · 快速
    为跨境从业人员提供专业的跨境浏览器

    3 引用 • 59 回帖 • 25 关注
  • TextBundle

    TextBundle 文件格式旨在应用程序之间交换 Markdown 或 Fountain 之类的纯文本文件时,提供更无缝的用户体验。

    1 引用 • 2 回帖 • 47 关注
  • Docker

    Docker 是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的操作系统上。容器完全使用沙箱机制,几乎没有性能开销,可以很容易地在机器和数据中心中运行。

    476 引用 • 899 回帖 • 2 关注
  • iOS

    iOS 是由苹果公司开发的移动操作系统,最早于 2007 年 1 月 9 日的 Macworld 大会上公布这个系统,最初是设计给 iPhone 使用的,后来陆续套用到 iPod touch、iPad 以及 Apple TV 等产品上。iOS 与苹果的 Mac OS X 操作系统一样,属于类 Unix 的商业操作系统。

    84 引用 • 139 回帖
  • 人工智能

    人工智能(Artificial Intelligence)是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门技术科学。

    75 引用 • 145 回帖 • 2 关注
  • Git

    Git 是 Linux Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。

    205 引用 • 357 回帖
  • Mobi.css

    Mobi.css is a lightweight, flexible CSS framework that focus on mobile.

    1 引用 • 6 回帖 • 697 关注
  • FreeMarker

    FreeMarker 是一款好用且功能强大的 Java 模版引擎。

    23 引用 • 20 回帖 • 427 关注