《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例)

这篇具有很好参考价值的文章主要介绍了《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

7.1 本章概述

本章基于Elastic Search 以及 Kafka 用于介绍 Flink 的 sinkTo / addSink 的 API 的使用方法,此外我们还会实现几个通用的方法,在实际应用场景中,针对不同的实体类可以通过这个通用的方法来完成,而不需要一对一地实现。

7.2 效果展示

flink 写数据到ES

此外,还将编写一个通用的工具类,用于 kafka 的序列化与反序列化,即对于某实体类(不管是什么类型的实体类均可),我们通过监听kafka的topic进行序列化,得到期望的实体类;并将flink执行结果进行反序列化,转换为json字符串,写回 kafka 。

7.3 代码编写

根据实际情况添加相关依赖,与 Flink / Kafka 相关的依赖我们在前面章节已经陈述过,这里我们只添加额外添加的依赖,也就是

其中 flink.version1.14.6

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.45</version>
        </dependency>
    </dependencies>

7.3.1 sinkTo ES

这个案例中,只是把三个字符串写入 ES。

package cn.smileyan.demo;

import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * 创建索引 hello-world 并向这个索引中写入数据
 * @author Smileyan
 */
public class FlinkElasticsearchDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);

        // 设置 Elasticsearch 集群的连接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
        String urls = parameters.get("es.hosts", "http://localhost:9200");
        final String regex = ",";
        List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());

        // 创建 ElasticsearchSinkFunction 用于将数据写入 Elasticsearch
        final String index = parameters.get("index", "hello-world");
        ElasticsearchSink.Builder<String> esSinkBuilder = buildSinkEs(httpHosts, index);

        // 创建数据流
        DataStream<String> resultStream = env.fromElements("data1", "data2", "data3");

        // 将结果 Sink 到 Elasticsearch
        resultStream.addSink(esSinkBuilder.build());

        // 执行 Flink 作业
        env.execute("Flink Elasticsearch Example");
    }

    /**
     * 获取用于将字符串数据 Sink 到 Elasticsearch 的 ElasticsearchSink.Builder 对象。
     *
     * @param httpHosts Elasticsearch 集群的连接信息
     * @param index 存储到 Elasticsearch 中的索引
     * @return ElasticsearchSink.Builder 对象
     */
    private static ElasticsearchSink.Builder<String> buildSinkEs(List<HttpHost> httpHosts, String index) {
        ElasticsearchSinkFunction<String> elasticsearchSinkFunction = (element, ctx, indexer) -> {
            // 将数据写入 Elasticsearch
            Map<String, String> json = new HashMap<>(1);
            json.put("data", element);
            IndexRequest source = Requests.indexRequest().index(index).source(json);
            indexer.add(source);
        };

        // 创建 ElasticsearchSink
        ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);

        // 设置批量写入的缓冲区大小,可根据实际情况调整
        esSinkBuilder.setBulkFlushMaxActions(1);
        return esSinkBuilder;
    }
}

7.3.2 将实体类反序列化后写 ES


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

/**
 *
 * @author Smileyan
 */
public class GenericFlinkElasticsearchDemo {
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool parameters = MultipleParameterTool.fromArgs(args);

        // 设置 Elasticsearch 集群的连接信息 比如 http://localhost:9200 | http://localhost:9200,http://localhost:9200
        String urls = parameters.get("es.hosts", "http://localhost:9200");
        final String regex = ",";
        List<HttpHost> httpHosts = Arrays.stream(urls.split(regex)).map(HttpHost::create).collect(Collectors.toList());

        // 创建 ElasticsearchSinkFunction 用于将数据写入 Elasticsearch
        final String index = parameters.get("es.index", "hello-world-2");

        GenericElasticsearchSinkFunction<Student> elasticsearchSinkFunction = new GenericElasticsearchSinkFunction<>(index);
        // 创建 ElasticsearchSink
        ElasticsearchSink.Builder<Student> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);

        DataStream<Student> inputDataStream = env.fromElements(
                new Student(1, "张三", 18),
                new Student(2, "李四", 20),
                new Student(3, "王五", 22)
        );

        // 将结果 Sink 到 Elasticsearch
        inputDataStream.addSink(esSinkBuilder.build());

        // 执行 Flink 作业
        env.execute("Flink Elasticsearch Example");
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

其中我们实现了一个可复用的类方法。

import com.alibaba.fastjson2.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;


/**
 * 将实体类数据写入到ES中
 * @author Smileyan
 */
public class GenericElasticsearchSinkFunction<T> implements ElasticsearchSinkFunction<T> {
    private final String indexName;

    public GenericElasticsearchSinkFunction(String indexName) {
        this.indexName = indexName;
    }

    @Override
    public void process(T element, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        // 将数据写入 Elasticsearch 的逻辑
        if (element != null) {
            JSONObject jsonMap = JSONObject.from(element);
            IndexRequest indexRequest = Requests.indexRequest()
                    .index(indexName)
                    .source(jsonMap);

            // 将 IndexRequest 添加到 RequestIndexer
            requestIndexer.add(indexRequest);
        }
    }
}

7.3.3 通用 kafka 的序列化、反序列化方法

这个案例中,我们通过实现两个关键接口,使得我们在应用 kafka 的sink和source的过程中更加方便。通过泛型来泛化序列化和反序列化的类类型,从而实现高可复用性。

package cn.smileyan.demos;

import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.nio.charset.StandardCharsets;


/**
 * 将字节码数据进行序列化,以及将实体类转换
 * @author smileyan
 * @param <O> 实体类
 */
@Slf4j
public class CommonEntitySchema<O> implements DeserializationSchema<O>, SerializationSchema<O> {

    private final Class<O> clazz;

    public CommonEntitySchema(Class<O> clazz) {
        this.clazz = clazz;
    }

    @Override
    public O deserialize(byte[] message) {
        try {
            String str = new String(message, StandardCharsets.UTF_8);
            log.info("kafka received message: {}", str);
            return JSON.parseObject(str, clazz);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
        return null;
    }

    @Override
    public boolean isEndOfStream(O nextElement) {
        return false;
    }

    @Override
    public TypeInformation<O> getProducedType() {
        return TypeInformation.of(clazz);
    }

    @Override
    public byte[] serialize(O element) {
        return JSON.toJSONBytes(element);
    }
}

接着我们基于这个已经实现的类,完成 kafka 的 source 和 sink 的例子。

package cn.smileyan.demos;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 将 kafka 数据进行序列化,转换为实体类
 * @author smileyan
 */
@Slf4j
public class FlinkKafkaEntitySinkToExample {
    /**
     * 参数解释:
     *  -bs broker 地址
     *  -kcg kafka consumer group
     *  -it kafka 输入数据 topic
     *  -ot kafka 输出数据 topic
     *  -ct 是否自动创建输入 topic
     *  -pt topic 分区数
     *  -rf topic 副本数
     */
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final MultipleParameterTool cmd = MultipleParameterTool.fromArgs(args);
        final String bootstrapServer = cmd.get("bs", "localhost:9092");
        final String kafkaConsumerGroup = cmd.get("kcg", "flink-consumer");
        final String inputTopic = cmd.get("it", "quickstart-events");
        final String outputTopic = cmd.get("ot", "quickstart-results");
        final boolean createTopic = cmd.getBoolean("ct", false);
        final Long transactionTimeout = cmd.getLong("tt", 300000L);

        log.info("broker is {} and topic is {}", bootstrapServer, inputTopic);

        // 如果 topic 不存在,并且开启了由 flink 任务创建 TOPIC。默认不开启,一般情况下,部署人员应当根据实际情况设置不同topic的并行度,副本数
        if (createTopic) {
            final int partitions = cmd.getInt("pt", 1);
            final short replicationFactor = cmd.getShort("rf", (short) 1);
            createTopic(bootstrapServer, inputTopic, partitions, replicationFactor);
        }

        final KafkaSource<Student> kafkaSource = KafkaSource.<Student>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new CommonEntitySchema<>(Student.class))
                .build();

        Properties properties = new Properties();
        properties.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, String.valueOf(transactionTimeout));
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        final KafkaSink<Student> kafkaSink = KafkaSink.<Student>builder()
                .setKafkaProducerConfig(properties)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(outputTopic)
                        .setValueSerializationSchema(new CommonEntitySchema<>(Student.class))
                        .build())
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        final DataStreamSource<Student> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 过滤掉反序列化失败的对象,只保留正确的对象
        SingleOutputStreamOperator<Student> out1 = kafkaStream.filter(Objects::nonNull)
                .map(student -> {
                    log.info("filter none objects is {}", student);
                    return student;
                });

        // 只选择年纪小于 10 的对象
        SingleOutputStreamOperator<Student> out2 = out1.filter(student -> student.getAge() != null && student.getAge() < 10)
                .map(student -> {
                    log.info("filter age < 10: {}", student);
                    return student;
                });

        out2.sinkTo(kafkaSink);

        env.execute("Flink Kafka Example");
    }

    /**
     * 如果 TOPIC 不存在则创建该 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要创建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本数
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }

    @Data
    static class Student {
        private String name;
        private Integer age;
    }

}

这个代码中,我们还通过 filter 方法过滤掉反序列化失败的、反序列化后不满足要求的实体。也就是把满足条件的实体类 sinkTo Kafka。

此外,官方推荐使用的也是这类方法,而不是以前官方实现的 FlinkKafkaXXXXX 之类的。

7.3.4 实现一个通用的KafkaSourceBuilder 以及 KafkaSinkBuilder 方法

假设,我们需要开发多个 Flink Job ,也就是多个不同的 main 方法类,需要针对不同类型的 kafka 消息完成对应的计算过程。也就是说 Kafka 的 source 以及 Kafka 的 sink 的实体类可能大不相同,但是这个过程以及参数是一样的。

所以我们不妨实现一个通过的 source 方法以及 sink 方法,通过 main 方法中的 String[] args 来确定 kafka 的 source 地址以及 kafka 的 sink 地址。

具体的实现方法如下:

package cn.smileyan.demos;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 通用的基于args的kafka构建
 * @author smileyan
 */
@Slf4j
public class KafkaArgsBuilder {
    /**
     * 构建参数
     */
    private final MultipleParameterTool parameterTool;

    public KafkaArgsBuilder(String[] args) {
        parameterTool = MultipleParameterTool.fromArgs(args);
    }

    /**
     * 构建kafka sink
     * @param clazz 实体类class
     * @param <E> 实体类泛型
     * @return kafka sink 对象
     */
    public <E> KafkaSink<E> buildSink(Class<E> clazz) {
        final String bs = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
        final String ot = parameterTool.getRequired(KafkaArgs.OUTPUT_TOPIC.key);

        return KafkaSink.<E>builder()
                .setBootstrapServers(bs)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(ot)
                        .setValueSerializationSchema(new CommonEntitySchema<>(clazz))
                        .build())
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
    }

    /**
     * 构建kafka source
     * @param clazz 实体类class
     * @param <E> 实体类泛型
     * @return kafka source 对象
     */
    public <E> KafkaSource<E> buildSource(Class<E> clazz) throws ExecutionException, InterruptedException {
        final String kafkaConsumerGroup = parameterTool.getRequired(KafkaArgs.KAFKA_CONSUMER_GROUP.key);
        final String bootstrapServer = parameterTool.getRequired(KafkaArgs.BOOTSTRAP_SERVER.key);
        final String inputTopic = parameterTool.getRequired(KafkaArgs.INPUT_TOPIC.key);
        final boolean createTopic = parameterTool.has(KafkaArgs.CREATE_TOPIC.key);

        if (createTopic) {
            final int partition = parameterTool.getInt(KafkaArgs.CREATE_TOPIC_PARTITION.key, 1);
            final short replicationFactor = parameterTool.getShort(KafkaArgs.REPLICATION_FACTOR.key, (short) 1);
            createTopic(bootstrapServer, inputTopic, partition, replicationFactor);
        }

        return KafkaSource.<E>builder()
                .setGroupId(kafkaConsumerGroup)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setBootstrapServers(bootstrapServer)
                .setTopics(inputTopic)
                .setValueOnlyDeserializer(new CommonEntitySchema<>(clazz))
                .build();
    }

    public enum KafkaArgs {
        /*
         * kafka 服务地址
         */
        BOOTSTRAP_SERVER("bs"),

        /*
         * kafka 消费者组
         */
        KAFKA_CONSUMER_GROUP("kcg"),

        /*
         * kafka 输入主题
         */
        INPUT_TOPIC("it"),

        /*
         * kafka 输出主题
         */
        OUTPUT_TOPIC("ot"),

        /*
         * 是否自动创建主题
         */
        CREATE_TOPIC("ct"),

        /*
         * 分区数
         */
        CREATE_TOPIC_PARTITION("pt"),

        /*
         * 副本数
         */
        REPLICATION_FACTOR("rf");

        private final String key;

        KafkaArgs(String key) {
            this.key = key;
        }
    }

    /**
     * 如果 TOPIC 不存在则创建该 TOPIC
     * @param bootstrapServer kafka broker 地址
     * @param topic 想要创建的 TOPIC
     * @param partitions 并行度
     * @param replicationFactor 副本数
     */
    public static void createTopic(String bootstrapServer,
                                   String topic,
                                   int partitions,
                                   int replicationFactor) throws ExecutionException, InterruptedException {
        Properties adminProperties = new Properties();
        adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        try (AdminClient adminClient = AdminClient.create(adminProperties)) {
            if (!adminClient.listTopics().names().get().contains(topic)) {
                NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor);
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                log.info("created topic: {}", topic);
            }
        }
    }
}

对应地,添加一个使用的案例:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 将实体类序列化并写入 kafka
 * @author smileyan
 */
public class KafkaArgsSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);

        DataStream<Student> dataStream = env.fromElements(
                new Student(1, "张三", 18),
                new Student(2, "李四", 20),
                new Student(3, "王五", 22)
        );

        KafkaSink<Student> sinker = kafkaArgsBuilder.buildSink(Student.class);

        dataStream.sinkTo(sinker);

        env.execute();
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

构建一个 kafka source 例子。

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 使用 KafkaArgsBuilder 的 source 案例
 * @author smileyan
 */
public class KafkaArgsSourceDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        final KafkaArgsBuilder kafkaArgsBuilder = new KafkaArgsBuilder(args);

        final KafkaSource<String> source = kafkaArgsBuilder.buildSource(String.class);

        final DataStreamSource<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        kafkaStream.print();

        env.execute("Flink Kafka Example");
    }
}

7.4 本章小结

前面章节已经比较详细的介绍了项目的基础环境以及注意事项,本章也作为一个承接点,把最基本的内容大致过一遍以后,将会基于我们假设的需求场景,开发对应的处理流程。包括对多条 kafka 消息的聚合操作,滑动窗口操作,自定义的 map 以及 flatMap 操作,实际业务场景中可能用的同步、异步 http 请求等。

希望可以帮助到您,非常感谢小伙伴们的支持 !

感谢阅读 ~

感谢点赞 ~

《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例),学习,flink,kafka文章来源地址https://www.toymoban.com/news/detail-858849.html

到了这里,关于《十堂课学习 Flink》第七章:Flink 流计算保存结果env.sinkTo(以 Kafka / ES 为例)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 云计算技术与应用课后答案第七章

    第七章 云技术 1、下列设备(或资源)可以成为虚拟内容的是: (ABCD) A、CPU B、内存 C、存储 D、网络 2、下列不属于虚拟化技术所具备的优势的是: (D) A、资源共享 B、负载动态优化 C、节能环保 D、人工管理 3、(B )年,IBM公司发布的IBM7044 被认为是最早在商业系统上实现虚拟化。

    2024年01月17日
    浏览(13)
  • 计算机组成原理——第七章输入输出系统(上)

    计算机组成原理——第七章输入输出系统(上)

    如若来世再相见,半点朱唇尽我尝 操作系统中探讨输入输出系统更多的是软件部分,就是操作系统如何来管理这些设备,机组这门课更加注意硬件的实现,现代计算机大致可有分成两个部分,主机和外设, 下图是单总线的结构,IO接口下面会连接一个个的设备,IO接口的背后

    2023年04月21日
    浏览(16)
  • 第七章 网络安全 | 计算机网络(谢希仁 第八版)

    第七章 网络安全 | 计算机网络(谢希仁 第八版)

    计算机网络面临的安全威胁和一些主要问题 对称密匙密码体制和公匙密码体制 数字签名与鉴别 网络安全协议IPsec协议族和运输安全协议SSl/TSL的要点 系统安全:防火墙和入侵检测 7.1.1 计算机网络面临的安全性威胁 计算机网络上的通信面临的威胁可以分为两大类: 被动攻击(

    2024年02月07日
    浏览(11)
  • python学习笔记:第七章面向对象

    与java类似,python作为一种面向对象的编程语言,也可以创建自定义的对象和类。 它的特性主要有:继承,封装,多态,方法,属性,超类 两者转换 每个类对应每个对象,下面有类变量 起到封装变量,封装函数,代码的作用 格式 实例化类,给类赋值 在创建对象时,就会被

    2024年02月13日
    浏览(15)
  • SVM——《统计学习方法第七章》

    SVM——《统计学习方法第七章》

    在第二章中我们学过感知机,它是最小化所有误分类点到超平面的距离之和, M 为误分类点的集合,得到的分离超平面是不唯一的。 min ⁡ ω , b [ − ∑ x i ∈ M y i ( ω ⋅ x i + b ) ] min_{omega,b}[-sum_{x_i in M}y_i (omegacdot x_i+b)] ω , b min ​ [ − x i ​ ∈ M ∑ ​ y i ​ ( ω ⋅ x i ​

    2024年02月08日
    浏览(7)
  • 计算机组成原理---第七章输入/输出系统---I/O方式

    计算机组成原理---第七章输入/输出系统---I/O方式

    中断服务程序是软件完成的,因此它一般是操作系统的模块,通过调用完成。 每个中断都有一个类型号,每个中断类型号都对应一个中断服务程序,每个中断服务程序都有一个入口地址,CPU必须找到入口地址,即中断向量。 中断向量是入口地址 ,因此中断向量地址是入口地

    2024年02月11日
    浏览(6)
  • 《操作系统真象还原》学习笔记:第七章 中断

    《操作系统真象还原》学习笔记:第七章 中断

    由于 CPU 获知了计算机中发生的某些事,CPU 暂停正在执行的程序,转而去执行处理该事件的程序,当这段程序执行完毕后,CPU 继续执行刚才的程序。整个过程称为中断处理,也称为中断。 把中断按事件来源分类,来自CPU外部的中断就称为外部中断,来自CPU内部的中断就称为

    2024年02月11日
    浏览(14)
  • 图像处理与计算机视觉--第七章-神经网络-单层感知器

    图像处理与计算机视觉--第七章-神经网络-单层感知器

      下图是一个简单的感知器模型图:                 在输出层的每一个节点上接受来自输入层的加权和作为输出层的净输入如下所示: n e t j ′ = ∑ i = 1 n w i j x i mathrm{net}_{j}^{prime}=sum_{i=1}^{n}w_{ij}x_{i} net j ′ ​ = i = 1 ∑ n ​ w ij ​ x i ​ 输出的值由激活

    2024年02月06日
    浏览(9)
  • 计算机网络重点概念整理-第七章 网络安全【期末复习|考研复习】

    计算机网络复习系列文章传送门: 第一章 计算机网络概述 第二章 物理层 第三章 数据链路层 第四章 网络层 第五章 传输层 第六章 应用层 第七章 网络安全 计算机网络整理-简称缩写 给大家整理了一下计算机网络中的重点概念,以供大家期末复习和考研复习的时候使用。 参

    2024年02月07日
    浏览(16)
  • 计算机操作系统第四版第七章文件管理—课后习题答案

    计算机操作系统第四版第七章文件管理—课后习题答案

        数据项:是最低级的数据组织形式,可以分为两种类型:基本数据项和组合数据项。基本数据项是用于描述一个对象的某种属性的字符集,是数据组织中可以命名的最小逻辑数据单位,又称为字段。组合数据项是由若干个基本数据项组成的,简称组项。 记录:记录是一组

    2024年02月03日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包